Updated Branches: refs/heads/trunk 4b54b8acd -> 811de14ec
sstables from stalled repair sessions become live after a reboot and can resurrect deleted data patch by jasobrown, reviewed by yukim for CASSANDRA-6503 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/36af4092 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/36af4092 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/36af4092 Branch: refs/heads/trunk Commit: 36af40925b3e8e01ede5ff6d7ed9a16046409fe3 Parents: b5a2b65 Author: Jason Brown <[email protected]> Authored: Thu Jan 30 16:17:01 2014 -0800 Committer: Jason Brown <[email protected]> Committed: Thu Jan 30 16:17:01 2014 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 28 ++++- .../apache/cassandra/io/sstable/Descriptor.java | 20 ++- .../apache/cassandra/io/sstable/SSTable.java | 2 +- .../org/apache/cassandra/io/util/FileUtils.java | 2 +- .../cassandra/streaming/StreamLockfile.java | 121 +++++++++++++++++++ .../cassandra/streaming/StreamReader.java | 5 +- .../cassandra/streaming/StreamReceiveTask.java | 72 ++++++++--- .../cassandra/streaming/StreamSession.java | 16 ++- .../cassandra/streaming/StreamTransferTask.java | 10 +- .../compress/CompressedStreamReader.java | 4 +- .../streaming/messages/FileMessage.java | 112 ----------------- .../streaming/messages/IncomingFileMessage.java | 78 ++++++++++++ .../streaming/messages/OutgoingFileMessage.java | 92 ++++++++++++++ .../streaming/messages/StreamMessage.java | 18 ++- 15 files changed, 429 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 531ac15..56a72ef 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -16,6 +16,7 @@ * Release sstables upon rebuilding 2i (CASSANDRA-6635) * Add AbstractCompactionStrategy.startup() method (CASSANDRA-6637) * SSTableScanner may skip rows during cleanup (CASSANDRA-6638) + * sstables from stalled repair sessions can resurrect deleted data (CASSANDRA-6503) Merged from 1.2: * fsync compression metadata (CASSANDRA-6531) * Validate CF existence on execution for prepared statement (CASSANDRA-6535) http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 8d09453..8750026 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db; import java.io.File; +import java.io.FileFilter; import java.io.IOException; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; @@ -66,6 +67,7 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.ColumnFamilyMetrics; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.StreamLockfile; import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.*; @@ -424,9 +426,33 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean */ public static void scrubDataDirectories(String keyspaceName, String columnFamily) { + Directories directories = Directories.create(keyspaceName, columnFamily); + + // remove any left-behind SSTables from failed/stalled streaming + FileFilter filter = new FileFilter() + { + public boolean accept(File pathname) + { + return pathname.toString().endsWith(StreamLockfile.FILE_EXT); + } + }; + for (File dir : directories.getCFDirectories()) + { + File[] lockfiles = dir.listFiles(filter); + if (lockfiles.length == 0) + continue; + logger.info("Removing SSTables from failed streaming session. Found {} files to cleanup.", lockfiles.length); + + for (File lockfile : lockfiles) + { + StreamLockfile streamLockfile = new StreamLockfile(lockfile); + streamLockfile.cleanup(); + streamLockfile.delete(); + } + } + logger.debug("Removing compacted SSTable files from {} (see http://wiki.apache.org/cassandra/MemtableSSTable)", columnFamily); - Directories directories = Directories.create(keyspaceName, columnFamily); for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister().list().entrySet()) { Descriptor desc = sstableFiles.getKey(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/io/sstable/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index 1b29c1c..d65da45 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -201,7 +201,18 @@ public class Descriptor public static Descriptor fromFilename(String filename) { File file = new File(filename); - return fromFilename(file.getParentFile(), file.getName()).left; + return fromFilename(file.getParentFile(), file.getName(), false).left; + } + + public static Descriptor fromFilename(String filename, boolean skipComponent) + { + File file = new File(filename); + return fromFilename(file.getParentFile(), file.getName(), skipComponent).left; + } + + public static Pair<Descriptor,String> fromFilename(File directory, String name) + { + return fromFilename(directory, name, false); } /** @@ -209,10 +220,11 @@ public class Descriptor * * @param directory The directory of the SSTable files * @param name The name of the SSTable file + * @param skipComponent true if the name param should not be parsed for a component tag * * @return A Descriptor for the SSTable, and the Component remainder. */ - public static Pair<Descriptor,String> fromFilename(File directory, String name) + public static Pair<Descriptor,String> fromFilename(File directory, String name, boolean skipComponent) { // tokenize the filename StringTokenizer st = new StringTokenizer(name, String.valueOf(separator)); @@ -239,7 +251,9 @@ public class Descriptor int generation = Integer.parseInt(nexttok); // component suffix - String component = st.nextToken(); + String component = null; + if (!skipComponent) + component = st.nextToken(); directory = directory != null ? directory : new File("."); return Pair.create(new Descriptor(version, directory, ksname, cfname, generation, temporary), component); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index eaa4522..69c6521 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -205,7 +205,7 @@ public abstract class SSTable /** * Discovers existing components for the descriptor. Slow: only intended for use outside the critical path. */ - static Set<Component> componentsFor(final Descriptor desc) + public static Set<Component> componentsFor(final Descriptor desc) { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/io/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index 6b91bd3..0d8538e 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -310,7 +310,7 @@ public class FileUtils return f.delete(); } - public static void delete(File[] files) + public static void delete(File... files) { for ( File file : files ) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/StreamLockfile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamLockfile.java b/src/java/org/apache/cassandra/streaming/StreamLockfile.java new file mode 100644 index 0000000..0eb01c5 --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/StreamLockfile.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.*; + +import com.google.common.base.Charsets; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.io.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the behavior for 'locking' any streamed sttables to a node. + * If a process crashes while converting a set of SSTableWriters to SSTReaders + * (meaning, some subset of SSTWs were converted, but not the entire set), we want + * to disregard the entire set as we will surely have missing data (by definition). + * + * Basic behavior id to write out the names of all SSTWs to a file, one SSTW per line, + * and then delete the file when complete (normal behavior). This should happen before + * converting any SSTWs. Thus, the lockfile is created, some SSTWs are converted, + * and if the process crashes, on restart, we look for any existing lockfile, and delete + * any referenced SSTRs. + */ +public class StreamLockfile +{ + public static final String FILE_EXT = ".lockfile"; + private static final Logger logger = LoggerFactory.getLogger(StreamLockfile.class); + + private final File lockfile; + + public StreamLockfile(File directory, UUID uuid) + { + lockfile = new File(directory, uuid.toString() + FILE_EXT); + } + + public StreamLockfile(File lockfile) + { + assert lockfile != null; + this.lockfile = lockfile; + } + + public void create(Collection<SSTableWriter> sstables) + { + List<String> sstablePaths = new ArrayList<>(sstables.size()); + for (SSTableWriter writer : sstables) + { + /* write out the file names *without* the 'tmp-file' flag in the file name. + this class will not need to clean up tmp files (on restart), CassandraDaemon does that already, + just make sure we delete the fully-formed SSTRs. */ + sstablePaths.add(writer.descriptor.asTemporary(false).baseFilename()); + } + + try + { + Files.write(lockfile.toPath(), sstablePaths, Charsets.UTF_8, + StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.DSYNC); + } + catch (IOException e) + { + logger.warn(String.format("Could not create lockfile %s for stream session, nothing to worry too much about", lockfile), e); + } + } + + public void delete() + { + FileUtils.delete(lockfile); + } + + public void cleanup() + { + List<String> files = readLockfile(lockfile); + for (String file : files) + { + try + { + Descriptor desc = Descriptor.fromFilename(file, true); + SSTable.delete(desc, SSTable.componentsFor(desc)); + } + catch (Exception e) + { + logger.warn("failed to delete a potentially stale sstable {}", file); + } + } + } + + private List<String> readLockfile(File lockfile) + { + try + { + return Files.readAllLines(lockfile.toPath(), Charsets.UTF_8); + } + catch (IOException e) + { + logger.info("couldn't read lockfile {}, ignoring", lockfile.getAbsolutePath()); + return Collections.emptyList(); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index d72cb5e..72c239c 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -36,7 +36,6 @@ import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.messages.FileMessageHeader; @@ -71,7 +70,7 @@ public class StreamReader * @return SSTable transferred * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. */ - public SSTableReader read(ReadableByteChannel channel) throws IOException + public SSTableWriter read(ReadableByteChannel channel) throws IOException { long totalSize = totalSize(); @@ -89,7 +88,7 @@ public class StreamReader // TODO move this to BytesReadTracker session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); } - return writer.closeAndOpenReader(); + return writer; } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index ac21352..9a2568d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -19,12 +19,15 @@ package org.apache.cassandra.streaming; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.UUID; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.Pair; /** @@ -36,16 +39,17 @@ public class StreamReceiveTask extends StreamTask private final int totalFiles; // total size of files to receive private final long totalSize; + private volatile boolean aborted; // holds references to SSTables received - protected Collection<SSTableReader> sstables; + protected Collection<SSTableWriter> sstables; public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize) { super(session, cfId); this.totalFiles = totalFiles; this.totalSize = totalSize; - this.sstables = new ArrayList<>(totalFiles); + this.sstables = new ArrayList<>(totalFiles); } /** @@ -53,9 +57,10 @@ public class StreamReceiveTask extends StreamTask * * @param sstable SSTable file received. */ - public void received(SSTableReader sstable) + public void received(SSTableWriter sstable) { assert cfId.equals(sstable.metadata.cfId); + assert !aborted; sstables.add(sstable); if (sstables.size() == totalFiles) @@ -72,24 +77,61 @@ public class StreamReceiveTask extends StreamTask return totalSize; } - // TODO should be run in background so that this does not block streaming private void complete() { - if (!SSTableReader.acquireReferences(sstables)) - throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred"); - try + if (!sstables.isEmpty()) + StorageService.tasks.submit(new OnCompletionRunnable(this)); + } + + private static class OnCompletionRunnable implements Runnable + { + private final StreamReceiveTask task; + + public OnCompletionRunnable(StreamReceiveTask task) { - Pair<String, String> kscf = Schema.instance.getCF(cfId); - ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); - // add sstables and build secondary indexes - cfs.addSSTables(sstables); - cfs.indexManager.maybeBuildSecondaryIndexes(sstables, cfs.indexManager.allIndexesNames()); + this.task = task; } - finally + + public void run() { - SSTableReader.releaseReferences(sstables); + Pair<String, String> kscf = Schema.instance.getCF(task.cfId); + ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + + StreamLockfile lockfile = new StreamLockfile(cfs.directories.getWriteableLocationAsFile(), UUID.randomUUID()); + lockfile.create(task.sstables); + List<SSTableReader> readers = new ArrayList<>(); + for (SSTableWriter writer : task.sstables) + readers.add(writer.closeAndOpenReader()); + lockfile.delete(); + + if (!SSTableReader.acquireReferences(readers)) + throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred"); + try + { + // add sstables and build secondary indexes + cfs.addSSTables(readers); + cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames()); + } + finally + { + SSTableReader.releaseReferences(readers); + } + + task.session.taskCompleted(task); } + } - session.taskCompleted(this); + public void abort() + { + aborted = true; + Runnable r = new Runnable() + { + public void run() + { + for (SSTableWriter writer : sstables) + writer.abort(); + } + }; + StorageService.tasks.submit(r); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 98a76fc..4777995 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -23,6 +23,7 @@ import java.util.*; import java.util.concurrent.Future; import com.google.common.collect.Lists; +import org.apache.cassandra.io.sstable.SSTableWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -312,6 +313,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe { state(finalState); + if (finalState == State.FAILED) + { + for (StreamReceiveTask srt : receivers.values()) + srt.abort(); + } + // Note that we shouldn't block on this close because this method is called on the handler // incoming thread (so we would deadlock). handler.close(); @@ -359,7 +366,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe break; case FILE: - receive((FileMessage) message); + receive((IncomingFileMessage) message); break; case RECEIVED: @@ -458,7 +465,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe * * @param message received file */ - public void receive(FileMessage message) + public void receive(IncomingFileMessage message) { long headerSize = message.header.size(); StreamingMetrics.totalIncomingBytes.inc(headerSize); @@ -487,7 +494,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe */ public void retry(UUID cfId, int sequenceNumber) { - FileMessage message = transfers.get(cfId).createMessageForRetry(sequenceNumber); + OutgoingFileMessage message = transfers.get(cfId).createMessageForRetry(sequenceNumber); handler.sendMessage(message); } @@ -502,6 +509,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe } else { + handler.sendMessage(new CompleteMessage()); state(State.WAIT_COMPLETE); } } @@ -623,7 +631,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe state(State.STREAMING); for (StreamTransferTask task : transfers.values()) { - Collection<FileMessage> messages = task.getFileMessages(); + Collection<OutgoingFileMessage> messages = task.getFileMessages(); if (messages.size() > 0) handler.sendMessages(messages); else http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index 61ad058..8e461cc 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -21,7 +21,7 @@ import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.streaming.messages.FileMessage; +import org.apache.cassandra.streaming.messages.OutgoingFileMessage; import org.apache.cassandra.utils.Pair; /** @@ -31,7 +31,7 @@ public class StreamTransferTask extends StreamTask { private final AtomicInteger sequenceNumber = new AtomicInteger(0); - private final Map<Integer, FileMessage> files = new HashMap<>(); + private final Map<Integer, OutgoingFileMessage> files = new HashMap<>(); private long totalSize; @@ -43,7 +43,7 @@ public class StreamTransferTask extends StreamTask public void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections) { assert sstable != null && cfId.equals(sstable.metadata.cfId); - FileMessage message = new FileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections); + OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections); files.put(message.header.sequenceNumber, message); totalSize += message.header.size(); } @@ -71,14 +71,14 @@ public class StreamTransferTask extends StreamTask return totalSize; } - public Collection<FileMessage> getFileMessages() + public Collection<OutgoingFileMessage> getFileMessages() { // We may race between queuing all those messages and the completion of the completion of // the first ones. So copy the values to avoid a ConcurrentModificationException return new ArrayList<>(files.values()); } - public FileMessage createMessageForRetry(int sequenceNumber) + public OutgoingFileMessage createMessageForRetry(int sequenceNumber) { return files.get(sequenceNumber); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 6f5d0f5..4aac941 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -55,7 +55,7 @@ public class CompressedStreamReader extends StreamReader * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails. */ @Override - public SSTableReader read(ReadableByteChannel channel) throws IOException + public SSTableWriter read(ReadableByteChannel channel) throws IOException { long totalSize = totalSize(); @@ -81,7 +81,7 @@ public class CompressedStreamReader extends StreamReader session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); } } - return writer.closeAndOpenReader(); + return writer; } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/messages/FileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessage.java b/src/java/org/apache/cassandra/streaming/messages/FileMessage.java deleted file mode 100644 index a0543c0..0000000 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessage.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.streaming.messages; - -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.util.List; - -import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.streaming.StreamReader; -import org.apache.cassandra.streaming.StreamSession; -import org.apache.cassandra.streaming.StreamWriter; -import org.apache.cassandra.streaming.compress.CompressedStreamReader; -import org.apache.cassandra.streaming.compress.CompressedStreamWriter; -import org.apache.cassandra.streaming.compress.CompressionInfo; -import org.apache.cassandra.utils.Pair; - -/** - * FileMessage is used to transfer/receive the part(or whole) of a SSTable data file. - */ -public class FileMessage extends StreamMessage -{ - public static Serializer<FileMessage> serializer = new Serializer<FileMessage>() - { - public FileMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException - { - DataInputStream input = new DataInputStream(Channels.newInputStream(in)); - FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version); - StreamReader reader = header.compressionInfo == null ? new StreamReader(header, session) - : new CompressedStreamReader(header, session); - - try - { - return new FileMessage(reader.read(in), header); - } - catch (Throwable e) - { - session.doRetry(header, e); - return null; - } - } - - public void serialize(FileMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException - { - DataOutput output = new DataOutputStream(Channels.newOutputStream(out)); - FileMessageHeader.serializer.serialize(message.header, output, version); - StreamWriter writer = message.header.compressionInfo == null ? - new StreamWriter(message.sstable, message.header.sections, session) : - new CompressedStreamWriter(message.sstable, - message.header.sections, - message.header.compressionInfo, session); - writer.write(out); - session.fileSent(message.header); - } - }; - - public final FileMessageHeader header; - public final SSTableReader sstable; - - public FileMessage(SSTableReader sstable, FileMessageHeader header) - { - super(Type.FILE); - this.header = header; - this.sstable = sstable; - } - - public FileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections) - { - super(Type.FILE); - this.sstable = sstable; - - CompressionInfo compressionInfo = null; - if (sstable.compression) - { - CompressionMetadata meta = sstable.getCompressionMetadata(); - compressionInfo = new CompressionInfo(meta.getChunksForSections(sections), meta.parameters); - } - this.header = new FileMessageHeader(sstable.metadata.cfId, - sequenceNumber, - sstable.descriptor.version.toString(), - estimatedKeys, - sections, - compressionInfo); - } - - @Override - public String toString() - { - return "File (" + header + ", file: " + sstable.getFilename() + ")"; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java new file mode 100644 index 0000000..a403390 --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming.messages; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; + +import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.streaming.StreamReader; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.compress.CompressedStreamReader; + +/** + * IncomingFileMessage is used to receive the part(or whole) of a SSTable data file. + */ +public class IncomingFileMessage extends StreamMessage +{ + public static Serializer<IncomingFileMessage> serializer = new Serializer<IncomingFileMessage>() + { + public IncomingFileMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException + { + DataInputStream input = new DataInputStream(Channels.newInputStream(in)); + FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version); + StreamReader reader = header.compressionInfo == null ? new StreamReader(header, session) + : new CompressedStreamReader(header, session); + + try + { + return new IncomingFileMessage(reader.read(in), header); + } + catch (Throwable e) + { + session.doRetry(header, e); + return null; + } + } + + public void serialize(IncomingFileMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException + { + throw new UnsupportedOperationException("Not allowed to call serialize on an incoming file"); + } + }; + + public FileMessageHeader header; + public SSTableWriter sstable; + + public IncomingFileMessage(SSTableWriter sstable, FileMessageHeader header) + { + super(Type.FILE); + this.header = header; + this.sstable = sstable; + } + + @Override + public String toString() + { + return "File (" + header + ", file: " + sstable.getFilename() + ")"; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java new file mode 100644 index 0000000..1fa115f --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming.messages; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.List; + +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.StreamWriter; +import org.apache.cassandra.streaming.compress.CompressedStreamWriter; +import org.apache.cassandra.streaming.compress.CompressionInfo; +import org.apache.cassandra.utils.Pair; + +/** + * OutgoingFileMessage is used to transfer the part(or whole) of a SSTable data file. + */ +public class OutgoingFileMessage extends StreamMessage +{ + public static Serializer<OutgoingFileMessage> serializer = new Serializer<OutgoingFileMessage>() + { + public OutgoingFileMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException + { + throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing file"); + } + + public void serialize(OutgoingFileMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException + { + DataOutput output = new DataOutputStream(Channels.newOutputStream(out)); + FileMessageHeader.serializer.serialize(message.header, output, version); + + final SSTableReader reader = message.sstable; + StreamWriter writer = message.header.compressionInfo == null ? + new StreamWriter(reader, message.header.sections, session) : + new CompressedStreamWriter(reader, + message.header.sections, + message.header.compressionInfo, session); + writer.write(out); + session.fileSent(message.header); + } + }; + + public FileMessageHeader header; + public SSTableReader sstable; + + public OutgoingFileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections) + { + super(Type.FILE); + this.sstable = sstable; + + CompressionInfo compressionInfo = null; + if (sstable.compression) + { + CompressionMetadata meta = sstable.getCompressionMetadata(); + compressionInfo = new CompressionInfo(meta.getChunksForSections(sections), meta.parameters); + } + this.header = new FileMessageHeader(sstable.metadata.cfId, + sequenceNumber, + sstable.descriptor.version.toString(), + estimatedKeys, + sections, + compressionInfo); + } + + @Override + public String toString() + { + return "File (" + header + ", file: " + sstable.getFilename() + ")"; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/36af4092/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java index 9e146e8..7010c95 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java @@ -42,7 +42,7 @@ public abstract class StreamMessage buff.flip(); while (buff.hasRemaining()) out.write(buff); - message.type.serializer.serialize(message, out, version, session); + message.type.outSerializer.serialize(message, out, version, session); } public static StreamMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException @@ -52,7 +52,7 @@ public abstract class StreamMessage { buff.flip(); Type type = Type.get(buff.get()); - return type.serializer.deserialize(in, version, session); + return type.inSerializer.deserialize(in, version, session); } else { @@ -73,7 +73,7 @@ public abstract class StreamMessage public static enum Type { PREPARE(1, 5, PrepareMessage.serializer), - FILE(2, 0, FileMessage.serializer), + FILE(2, 0, IncomingFileMessage.serializer, OutgoingFileMessage.serializer), RECEIVED(3, 4, ReceivedMessage.serializer), RETRY(4, 4, RetryMessage.serializer), COMPLETE(5, 1, CompleteMessage.serializer), @@ -91,14 +91,22 @@ public abstract class StreamMessage private final byte type; public final int priority; - public final Serializer<StreamMessage> serializer; + public final Serializer<StreamMessage> inSerializer; + public final Serializer<StreamMessage> outSerializer; @SuppressWarnings("unchecked") private Type(int type, int priority, Serializer serializer) { + this(type, priority, serializer, serializer); + } + + @SuppressWarnings("unchecked") + private Type(int type, int priority, Serializer inSerializer, Serializer outSerializer) + { this.type = (byte) type; this.priority = priority; - this.serializer = serializer; + this.inSerializer = inSerializer; + this.outSerializer = outSerializer; } }
