Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/645d8278 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/645d8278 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/645d8278 Branch: refs/heads/trunk Commit: 645d8278bcf6281c8272f82d0d661e386a7cbe7d Parents: dd091d4 dd9ae1d Author: Marcus Eriksson <[email protected]> Authored: Thu May 3 15:46:23 2018 +0200 Committer: Marcus Eriksson <[email protected]> Committed: Thu May 3 15:46:51 2018 +0200 ---------------------------------------------------------------------- .circleci/config.yml | 8 ++++++++ .../apache/cassandra/db/CassandraKeyspaceWriteHandler.java | 2 ++ .../apache/cassandra/db/streaming/CassandraStreamReader.java | 2 +- .../cassandra/db/streaming/CassandraStreamReceiver.java | 1 + src/java/org/apache/cassandra/net/MessageIn.java | 5 ++++- .../org/apache/cassandra/net/async/MessageInHandler.java | 5 ++++- 6 files changed, 20 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/645d8278/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java index 1f1bcdb,0000000..efba11f mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java +++ b/src/java/org/apache/cassandra/db/CassandraKeyspaceWriteHandler.java @@@ -1,92 -1,0 +1,94 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.commitlog.CommitLogPosition; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.concurrent.OpOrder; + +public class CassandraKeyspaceWriteHandler implements KeyspaceWriteHandler +{ + private final Keyspace keyspace; + + public CassandraKeyspaceWriteHandler(Keyspace keyspace) + { + this.keyspace = keyspace; + } + + @Override ++ @SuppressWarnings("resource") // group is closed when CassandraWriteContext is closed + public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws RequestExecutionException + { + OpOrder.Group group = null; + try + { + group = Keyspace.writeOrder.start(); + + // write the mutation to the commitlog and memtables + CommitLogPosition position = null; + if (makeDurable) + { + Tracing.trace("Appending to commitlog"); + position = CommitLog.instance.add(mutation); + } + return new CassandraWriteContext(group, position); + } + catch (Throwable t) + { + if (group != null) + { + group.close(); + } + throw t; + } + } + ++ @SuppressWarnings("resource") // group is closed when CassandraWriteContext is closed + private WriteContext createEmptyContext() + { + OpOrder.Group group = null; + try + { + group = Keyspace.writeOrder.start(); + return new CassandraWriteContext(group, null); + } + catch (Throwable t) + { + if (group != null) + { + group.close(); + } + throw t; + } + } + + @Override + public WriteContext createContextForIndexing() + { + return createEmptyContext(); + } + + @Override + public WriteContext createContextForRead() + { + return createEmptyContext(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/645d8278/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java index 8e7b198,0000000..3930196 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java @@@ -1,285 -1,0 +1,285 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.streaming; + +import java.io.*; +import java.util.Collection; +import java.util.UUID; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.UnmodifiableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.TrackedDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.SSTableSimpleIterator; +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.compress.StreamCompressionInputStream; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.streaming.messages.StreamMessage; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraStreamReader reads from stream and writes to SSTable. + */ +public class CassandraStreamReader +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraStreamReader.class); + protected final TableId tableId; + protected final long estimatedKeys; + protected final Collection<SSTableReader.PartitionPositionBounds> sections; + protected final StreamSession session; + protected final Version inputVersion; + protected final long repairedAt; + protected final UUID pendingRepair; + protected final SSTableFormat.Type format; + protected final int sstableLevel; + protected final SerializationHeader.Component header; + protected final int fileSeqNum; + + public CassandraStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) + { + if (session.getPendingRepair() != null) + { + // we should only ever be streaming pending repair + // sstables if the session has a pending repair id + assert session.getPendingRepair().equals(header.pendingRepair); + } + this.session = session; + this.tableId = header.tableId; + this.estimatedKeys = streamHeader.estimatedKeys; + this.sections = streamHeader.sections; + this.inputVersion = streamHeader.version; + this.repairedAt = header.repairedAt; + this.pendingRepair = header.pendingRepair; + this.format = streamHeader.format; + this.sstableLevel = streamHeader.sstableLevel; + this.header = streamHeader.header; + this.fileSeqNum = header.sequenceNumber; + } + + /** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ + @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed + public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException + { + long totalSize = totalSize(); + + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); + if (cfs == null) + { + // schema was dropped during streaming + throw new IOException("CF " + tableId + " was dropped during streaming"); + } + + logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.", + session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), + cfs.getTableName(), pendingRepair); + + StreamDeserializer deserializer = null; + SSTableMultiWriter writer = null; + try (StreamCompressionInputStream streamCompressionInputStream = new StreamCompressionInputStream(inputPlus, StreamMessage.CURRENT_VERSION)) + { + TrackedDataInputPlus in = new TrackedDataInputPlus(streamCompressionInputStream); + deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); + writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); + while (in.getBytesRead() < totalSize) + { + writePartition(deserializer, writer); + // TODO move this to BytesReadTracker + session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); + } + logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", + session.planId(), fileSeqNum, session.peer, FBUtilities.prettyPrintMemory(in.getBytesRead()), FBUtilities.prettyPrintMemory(totalSize)); + return writer; + } + catch (Throwable e) + { + Object partitionKey = deserializer != null ? deserializer.partitionKey() : ""; + logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", + session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName(), e); + if (writer != null) + { + writer.abort(e); + } + throw Throwables.propagate(e); + } + } + + protected SerializationHeader getHeader(TableMetadata metadata) + { + return header != null? header.toHeader(metadata) : null; //pre-3.0 sstable have no SerializationHeader + } - ++ @SuppressWarnings("resource") + protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, UUID pendingRepair, SSTableFormat.Type format) throws IOException + { + Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); + if (localDir == null) + throw new IOException(String.format("Insufficient disk space to store %s", FBUtilities.prettyPrintMemory(totalSize))); + + StreamReceiver streamReceiver = session.getAggregator(tableId); + Preconditions.checkState(streamReceiver instanceof CassandraStreamReceiver); + LifecycleTransaction txn = CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).getTransaction(); + + RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, format, sstableLevel, totalSize, txn, getHeader(cfs.metadata())); + return writer; + } + + protected long totalSize() + { + long size = 0; + for (SSTableReader.PartitionPositionBounds section : sections) + size += section.upperPosition - section.lowerPosition; + return size; + } + + protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer) throws IOException + { + writer.append(deserializer.newPartition()); + deserializer.checkForExceptions(); + } + + public static class StreamDeserializer extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator + { + private final TableMetadata metadata; + private final DataInputPlus in; + private final SerializationHeader header; + private final SerializationHelper helper; + + private DecoratedKey key; + private DeletionTime partitionLevelDeletion; + private SSTableSimpleIterator iterator; + private Row staticRow; + private IOException exception; + + public StreamDeserializer(TableMetadata metadata, DataInputPlus in, Version version, SerializationHeader header) throws IOException + { + this.metadata = metadata; + this.in = in; + this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE); + this.header = header; + } + + public StreamDeserializer newPartition() throws IOException + { + key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in)); + partitionLevelDeletion = DeletionTime.serializer.deserialize(in); + iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion); + staticRow = iterator.readStaticRow(); + return this; + } + + public TableMetadata metadata() + { + return metadata; + } + + public RegularAndStaticColumns columns() + { + // We don't know which columns we'll get so assume it can be all of them + return metadata.regularAndStaticColumns(); + } + + public boolean isReverseOrder() + { + return false; + } + + public DecoratedKey partitionKey() + { + return key; + } + + public DeletionTime partitionLevelDeletion() + { + return partitionLevelDeletion; + } + + public Row staticRow() + { + return staticRow; + } + + public EncodingStats stats() + { + return header.stats(); + } + + public boolean hasNext() + { + try + { + return iterator.hasNext(); + } + catch (IOError e) + { + if (e.getCause() != null && e.getCause() instanceof IOException) + { + exception = (IOException)e.getCause(); + return false; + } + throw e; + } + } + + public Unfiltered next() + { + // Note that in practice we know that IOException will be thrown by hasNext(), because that's + // where the actual reading happens, so we don't bother catching RuntimeException here (contrarily + // to what we do in hasNext) + Unfiltered unfiltered = iterator.next(); + return metadata.isCounter() && unfiltered.kind() == Unfiltered.Kind.ROW + ? maybeMarkLocalToBeCleared((Row) unfiltered) + : unfiltered; + } + + private Row maybeMarkLocalToBeCleared(Row row) + { + return metadata.isCounter() ? row.markCounterLocalToBeCleared() : row; + } + + public void checkForExceptions() throws IOException + { + if (exception != null) + throw exception; + } + + public void close() + { + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/645d8278/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java index 6a57e49,0000000..bb5531e mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java @@@ -1,248 -1,0 +1,249 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.view.View; +import org.apache.cassandra.dht.Bounds; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.streaming.IncomingStream; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.utils.concurrent.Refs; + +public class CassandraStreamReceiver implements StreamReceiver +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraStreamReceiver.class); + + private static final int MAX_ROWS_PER_BATCH = Integer.getInteger("cassandra.repair.mutation_repair_rows_per_batch", 100); + + private final ColumnFamilyStore cfs; + private final StreamSession session; + + // Transaction tracking new files received + private final LifecycleTransaction txn; + + // holds references to SSTables received + protected Collection<SSTableReader> sstables; + + private final boolean requiresWritePath; + + + public CassandraStreamReceiver(ColumnFamilyStore cfs, StreamSession session, int totalFiles) + { + this.cfs = cfs; + this.session = session; + // this is an "offline" transaction, as we currently manually expose the sstables once done; + // this should be revisited at a later date, so that LifecycleTransaction manages all sstable state changes + this.txn = LifecycleTransaction.offline(OperationType.STREAM); + this.sstables = new ArrayList<>(totalFiles); + this.requiresWritePath = requiresWritePath(cfs); + } + + public LifecycleTransaction getTransaction() + { + return txn; + } + + public static CassandraStreamReceiver fromReceiver(StreamReceiver receiver) + { + Preconditions.checkArgument(receiver instanceof CassandraStreamReceiver); + return (CassandraStreamReceiver) receiver; + } + + private static CassandraIncomingFile getFile(IncomingStream stream) + { + Preconditions.checkArgument(stream instanceof CassandraIncomingFile, "Wrong stream type: {}", stream); + return (CassandraIncomingFile) stream; + } + + @Override ++ @SuppressWarnings("resource") + public void received(IncomingStream stream) + { + CassandraIncomingFile file = getFile(stream); + + Collection<SSTableReader> finished = null; + SSTableMultiWriter sstable = file.getSSTable(); + try + { + finished = sstable.finish(true); + } + catch (Throwable t) + { + Throwables.maybeFail(sstable.abort(t)); + } + txn.update(finished, false); + sstables.addAll(finished); + } + + @Override + public void discardStream(IncomingStream stream) + { + CassandraIncomingFile file = getFile(stream); + Throwables.maybeFail(file.getSSTable().abort(null)); + } + + @Override + public void abort() + { + sstables.clear(); + txn.abort(); + } + + private boolean hasViews(ColumnFamilyStore cfs) + { + return !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, cfs.getTableName())); + } + + private boolean hasCDC(ColumnFamilyStore cfs) + { + return cfs.metadata().params.cdc; + } + + /* + * We have a special path for views and for CDC. + * + * For views, since the view requires cleaning up any pre-existing state, we must put all partitions + * through the same write path as normal mutations. This also ensures any 2is are also updated. + * + * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they + * can be archived by the CDC process on discard. + */ + private boolean requiresWritePath(ColumnFamilyStore cfs) { + return hasCDC(cfs) || (session.streamOperation().requiresViewBuild() && hasViews(cfs)); + } + + private void sendThroughWritePath(ColumnFamilyStore cfs, Collection<SSTableReader> readers) { + boolean hasCdc = hasCDC(cfs); + ColumnFilter filter = ColumnFilter.all(cfs.metadata()); + for (SSTableReader reader : readers) + { + Keyspace ks = Keyspace.open(reader.getKeyspaceName()); + // When doing mutation-based repair we split each partition into smaller batches + // ({@link Stream MAX_ROWS_PER_BATCH}) to avoid OOMing and generating heap pressure + try (ISSTableScanner scanner = reader.getScanner(); + CloseableIterator<UnfilteredRowIterator> throttledPartitions = ThrottledUnfilteredIterator.throttle(scanner, MAX_ROWS_PER_BATCH)) + { + while (throttledPartitions.hasNext()) + { + // MV *can* be applied unsafe if there's no CDC on the CFS as we flush + // before transaction is done. + // + // If the CFS has CDC, however, these updates need to be written to the CommitLog + // so they get archived into the cdc_raw folder + ks.apply(new Mutation(PartitionUpdate.fromIterator(throttledPartitions.next(), filter)), + hasCdc, + true, + false); + } + } + } + } + + private synchronized void finishTransaction() + { + txn.finish(); + } + + @Override + public void finished() + { + boolean requiresWritePath = requiresWritePath(cfs); + Collection<SSTableReader> readers = sstables; + + try (Refs<SSTableReader> refs = Refs.ref(readers)) + { + if (requiresWritePath) + { + sendThroughWritePath(cfs, readers); + } + else + { + finishTransaction(); + + // add sstables (this will build secondary indexes too, see CASSANDRA-10130) + logger.debug("[Stream #{}] Received {} sstables from {} ({})", session.planId(), readers.size(), session.peer, readers); + cfs.addSSTables(readers); + + //invalidate row and counter cache + if (cfs.isRowCacheEnabled() || cfs.metadata().isCounter()) + { + List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size()); + readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()))); + Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate); + + if (cfs.isRowCacheEnabled()) + { + int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds); + if (invalidatedKeys > 0) + logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " + + "receive task completed.", session.planId(), invalidatedKeys, + cfs.keyspace.getName(), cfs.getTableName()); + } + + if (cfs.metadata().isCounter()) + { + int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds); + if (invalidatedKeys > 0) + logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " + + "receive task completed.", session.planId(), invalidatedKeys, + cfs.keyspace.getName(), cfs.getTableName()); + } + } + } + } + } + + @Override + public void cleanup() + { + // We don't keep the streamed sstables since we've applied them manually so we abort the txn and delete + // the streamed sstables. + if (requiresWritePath) + { + cfs.forceBlockingFlush(); + abort(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/645d8278/src/java/org/apache/cassandra/net/MessageIn.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/MessageIn.java index 1cd7547,d06d515..7fb866f --- a/src/java/org/apache/cassandra/net/MessageIn.java +++ b/src/java/org/apache/cassandra/net/MessageIn.java @@@ -112,26 -97,15 +112,29 @@@ public class MessageIn<T for (int i = 0; i < parameterCount; i++) { String key = in.readUTF(); - byte[] value = new byte[in.readInt()]; - in.readFully(value); - builder.put(key, value); + ParameterType type = ParameterType.byName.get(key); + if (type != null) + { + byte[] value = new byte[in.readInt()]; + in.readFully(value); - builder.put(type, type.serializer.deserialize(new DataInputBuffer(value), version)); ++ try (DataInputBuffer buffer = new DataInputBuffer(value)) ++ { ++ builder.put(type, type.serializer.deserialize(buffer, version)); ++ } + } + else + { + in.skipBytes(in.readInt()); + } } - parameters = builder.build(); + return builder.build(); } + } - int payloadSize = in.readInt(); - IVersionedSerializer<T2> serializer = (IVersionedSerializer<T2>) MessagingService.instance().verbSerializers.get(verb); + public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, long constructionTime, + InetAddressAndPort from, int payloadSize, Verb verb, Map<ParameterType, Object> parameters) throws IOException + { + IVersionedSerializer<T2> serializer = (IVersionedSerializer<T2>) MessagingService.verbSerializers.get(verb); if (serializer instanceof MessagingService.CallbackDeterminedSerializer) { CallbackInfo callback = MessagingService.instance().getRegisteredCallback(id); http://git-wip-us.apache.org/repos/asf/cassandra/blob/645d8278/src/java/org/apache/cassandra/net/async/MessageInHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/async/MessageInHandler.java index 0423b80,0000000..b9cbd1a mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/net/async/MessageInHandler.java +++ b/src/java/org/apache/cassandra/net/async/MessageInHandler.java @@@ -1,317 -1,0 +1,320 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.cassandra.db.monitoring.ApproximateTime; +import org.apache.cassandra.exceptions.UnknownTableException; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.CompactEndpointSerializationHelper; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.ParameterType; + +/** + * Parses out individual messages from the incoming buffers. Each message, both header and payload, is incrementally built up + * from the available input data, then passed to the {@link #messageConsumer}. + * + * Note: this class derives from {@link ByteToMessageDecoder} to take advantage of the {@link ByteToMessageDecoder.Cumulator} + * behavior across {@link #decode(ChannelHandlerContext, ByteBuf, List)} invocations. That way we don't have to maintain + * the not-fully consumed {@link ByteBuf}s. + */ +class MessageInHandler extends ByteToMessageDecoder +{ + public static final Logger logger = LoggerFactory.getLogger(MessageInHandler.class); + + /** + * The default target for consuming deserialized {@link MessageIn}. + */ + static final BiConsumer<MessageIn, Integer> MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> MessagingService.instance().receive(messageIn, id); + + private enum State + { + READ_FIRST_CHUNK, + READ_IP_ADDRESS, + READ_SECOND_CHUNK, + READ_PARAMETERS_DATA, + READ_PAYLOAD_SIZE, + READ_PAYLOAD + } + + /** + * The byte count for magic, msg id, timestamp values. + */ + @VisibleForTesting + static final int FIRST_SECTION_BYTE_COUNT = 12; + + /** + * The byte count for the verb id and the number of parameters. + */ + private static final int SECOND_SECTION_BYTE_COUNT = 8; + + private final InetAddressAndPort peer; + private final int messagingVersion; + + /** + * Abstracts out depending directly on {@link MessagingService#receive(MessageIn, int)}; this makes tests more sane + * as they don't require nor trigger the entire message processing circus. + */ + private final BiConsumer<MessageIn, Integer> messageConsumer; + + private State state; + private MessageHeader messageHeader; + + MessageInHandler(InetAddressAndPort peer, int messagingVersion) + { + this (peer, messagingVersion, MESSAGING_SERVICE_CONSUMER); + } + + MessageInHandler(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer) + { + this.peer = peer; + this.messagingVersion = messagingVersion; + this.messageConsumer = messageConsumer; + state = State.READ_FIRST_CHUNK; + } + + /** + * For each new message coming in, builds up a {@link MessageHeader} instance incrementally. This method + * attempts to deserialize as much header information as it can out of the incoming {@link ByteBuf}, and + * maintains a trivial state machine to remember progress across invocations. + */ + @SuppressWarnings("resource") + public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) + { + ByteBufDataInputPlus inputPlus = new ByteBufDataInputPlus(in); + try + { + while (true) + { + // an imperfect optimization around calling in.readableBytes() all the time + int readableBytes = in.readableBytes(); + + switch (state) + { + case READ_FIRST_CHUNK: + if (readableBytes < FIRST_SECTION_BYTE_COUNT) + return; + MessagingService.validateMagic(in.readInt()); + messageHeader = new MessageHeader(); + messageHeader.messageId = in.readInt(); + int messageTimestamp = in.readInt(); // make sure to read the sent timestamp, even if DatabaseDescriptor.hasCrossNodeTimeout() is not enabled + messageHeader.constructionTime = MessageIn.deriveConstructionTime(peer, messageTimestamp, ApproximateTime.currentTimeMillis()); + state = State.READ_IP_ADDRESS; + readableBytes -= FIRST_SECTION_BYTE_COUNT; + // fall-through + case READ_IP_ADDRESS: + // unfortunately, this assumes knowledge of how CompactEndpointSerializationHelper serializes data (the first byte is the size). + // first, check that we can actually read the size byte, then check if we can read that number of bytes. + // the "+ 1" is to make sure we have the size byte in addition to the serialized IP addr count of bytes in the buffer. + int serializedAddrSize; + if (readableBytes < 1 || readableBytes < (serializedAddrSize = in.getByte(in.readerIndex()) + 1)) + return; + messageHeader.from = CompactEndpointSerializationHelper.instance.deserialize(inputPlus, messagingVersion); + state = State.READ_SECOND_CHUNK; + readableBytes -= serializedAddrSize; + // fall-through + case READ_SECOND_CHUNK: + if (readableBytes < SECOND_SECTION_BYTE_COUNT) + return; + messageHeader.verb = MessagingService.Verb.fromId(in.readInt()); + int paramCount = in.readInt(); + messageHeader.parameterCount = paramCount; + messageHeader.parameters = paramCount == 0 ? Collections.emptyMap() : new HashMap<>(); + state = State.READ_PARAMETERS_DATA; + readableBytes -= SECOND_SECTION_BYTE_COUNT; + // fall-through + case READ_PARAMETERS_DATA: + if (messageHeader.parameterCount > 0) + { + if (!readParameters(in, inputPlus, messageHeader.parameterCount, messageHeader.parameters)) + return; + readableBytes = in.readableBytes(); // we read an indeterminate number of bytes for the headers, so just ask the buffer again + } + state = State.READ_PAYLOAD_SIZE; + // fall-through + case READ_PAYLOAD_SIZE: + if (readableBytes < 4) + return; + messageHeader.payloadSize = in.readInt(); + state = State.READ_PAYLOAD; + readableBytes -= 4; + // fall-through + case READ_PAYLOAD: + if (readableBytes < messageHeader.payloadSize) + return; + + // TODO consider deserailizing the messge not on the event loop + MessageIn<Object> messageIn = MessageIn.read(inputPlus, messagingVersion, + messageHeader.messageId, messageHeader.constructionTime, messageHeader.from, + messageHeader.payloadSize, messageHeader.verb, messageHeader.parameters); + + if (messageIn != null) + messageConsumer.accept(messageIn, messageHeader.messageId); + + state = State.READ_FIRST_CHUNK; + messageHeader = null; + break; + default: + throw new IllegalStateException("unknown/unhandled state: " + state); + } + } + } + catch (Exception e) + { + exceptionCaught(ctx, e); + } + } + + /** + * @return <code>true</code> if all the parameters have been read from the {@link ByteBuf}; else, <code>false</code>. + */ + private boolean readParameters(ByteBuf in, ByteBufDataInputPlus inputPlus, int parameterCount, Map<ParameterType, Object> parameters) throws IOException + { + // makes the assumption that map.size() is a constant time function (HashMap.size() is) + while (parameters.size() < parameterCount) + { + if (!canReadNextParam(in)) + return false; + + String key = DataInputStream.readUTF(inputPlus); + ParameterType parameterType = ParameterType.byName.get(key); + byte[] value = new byte[in.readInt()]; + in.readBytes(value); - parameters.put(parameterType, parameterType.serializer.deserialize(new DataInputBuffer(value), messagingVersion)); ++ try (DataInputBuffer buffer = new DataInputBuffer(value)) ++ { ++ parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion)); ++ } + } + + return true; + } + + /** + * Determine if we can read the next parameter from the {@link ByteBuf}. This method will *always* set the {@code in} + * readIndex back to where it was when this method was invoked. + * + * NOTE: this function would be sooo much simpler if we included a parameters length int in the messaging format, + * instead of checking the remaining readable bytes for each field as we're parsing it. c'est la vie ... + */ + @VisibleForTesting + static boolean canReadNextParam(ByteBuf in) + { + in.markReaderIndex(); + // capture the readableBytes value here to avoid all the virtual function calls. + // subtract 6 as we know we'll be reading a short and an int (for the utf and value lengths). + final int minimumBytesRequired = 6; + int readableBytes = in.readableBytes() - minimumBytesRequired; + if (readableBytes < 0) + return false; + + // this is a tad invasive, but since we know the UTF string is prefaced with a 2-byte length, + // read that to make sure we have enough bytes to read the string itself. + short strLen = in.readShort(); + // check if we can read that many bytes for the UTF + if (strLen > readableBytes) + { + in.resetReaderIndex(); + return false; + } + in.skipBytes(strLen); + readableBytes -= strLen; + + // check if we can read the value length + if (readableBytes < 4) + { + in.resetReaderIndex(); + return false; + } + int valueLength = in.readInt(); + // check if we read that many bytes for the value + if (valueLength > readableBytes) + { + in.resetReaderIndex(); + return false; + } + + in.resetReaderIndex(); + return true; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + { + if (cause instanceof EOFException) + logger.trace("eof reading from socket; closing", cause); + else if (cause instanceof UnknownTableException) + logger.warn("Got message from unknown table while reading from socket; closing", cause); + else if (cause instanceof IOException) + logger.trace("IOException reading from socket; closing", cause); + else + logger.warn("Unexpected exception caught in inbound channel pipeline from " + ctx.channel().remoteAddress(), cause); + + ctx.close(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception + { + logger.debug("received channel closed message for peer {} on local addr {}", ctx.channel().remoteAddress(), ctx.channel().localAddress()); + ctx.fireChannelInactive(); + } + + // should ony be used for testing!!! + @VisibleForTesting + MessageHeader getMessageHeader() + { + return messageHeader; + } + + /** + * A simple struct to hold the message header data as it is being built up. + */ + static class MessageHeader + { + int messageId; + long constructionTime; + InetAddressAndPort from; + MessagingService.Verb verb; + int payloadSize; + + Map<ParameterType, Object> parameters = Collections.emptyMap(); + + /** + * Total number of incoming parameters. + */ + int parameterCount; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
