clean up DataOutputBuffer
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9252ee95 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9252ee95 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9252ee95 Branch: refs/heads/trunk Commit: 9252ee95b9f8e30f94ffb2eaf3a3828dadee3610 Parents: b4af12b Author: Jonathan Ellis <[email protected]> Authored: Wed Feb 29 13:59:20 2012 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Tue Mar 27 09:51:53 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/db/marshal/CompositeType.java | 92 ++++++--------- .../apache/cassandra/io/util/DataOutputBuffer.java | 46 +++++--- .../org/apache/cassandra/io/util/OutputBuffer.java | 73 ------------ .../apache/cassandra/thrift/CassandraServer.java | 19 +-- 5 files changed, 76 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8355543..052d357 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 1.1.1-dev + * clean up and optimize DataOutputBuffer, used by CQL compression and + CompositeType (CASSANDRA-4072) * optimize commitlog checksumming (CASSANDRA-3610) * identify and blacklist corrupted SSTables from future compactions (CASSANDRA-2261) * Move CfDef and KsDef validation out of thrift (CASSANDRA-4037) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/src/java/org/apache/cassandra/db/marshal/CompositeType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java index a3e9e98..67323bf 100644 --- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java +++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java @@ -18,17 +18,18 @@ */ package org.apache.cassandra.db.marshal; -import java.io.*; import java.nio.ByteBuffer; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.cql3.ColumnNameBuilder; import org.apache.cassandra.cql3.Relation; import org.apache.cassandra.cql3.Term; -import org.apache.cassandra.config.ConfigurationException; -import org.apache.cassandra.io.util.FastByteArrayOutputStream; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.utils.ByteBufferUtil; /* * The encoding of a CompositeType column name should be: @@ -171,8 +172,7 @@ public class CompositeType extends AbstractCompositeType private final CompositeType composite; private int current; - private final FastByteArrayOutputStream baos = new FastByteArrayOutputStream(); - private final DataOutput out = new DataOutputStream(baos); + private final DataOutputBuffer out = new DataOutputBuffer(); public Builder(CompositeType composite) { @@ -183,14 +183,7 @@ public class CompositeType extends AbstractCompositeType { this(b.composite); this.current = b.current; - try - { - out.write(b.baos.toByteArray()); - } - catch (IOException e) - { - throw new RuntimeException(e); - } + out.write(b.out.getData(), 0, b.out.getLength()); } public Builder add(Term t, Relation.Type op, List<ByteBuffer> variables) throws InvalidRequestException @@ -200,38 +193,31 @@ public class CompositeType extends AbstractCompositeType AbstractType currentType = composite.types.get(current++); ByteBuffer buffer = t.getByteBuffer(currentType, variables); - try - { - ByteBufferUtil.writeWithShortLength(buffer, out); - - /* - * Given the rules for eoc (end-of-component, see AbstractCompositeType.compare()), - * We can select: - * - = 'a' by using <'a'><0> - * - < 'a' by using <'a'><-1> - * - <= 'a' by using <'a'><1> - * - > 'a' by using <'a'><1> - * - >= 'a' by using <'a'><0> - */ - switch (op) - { - case LT: - out.write((byte) -1); - break; - case GT: - case LTE: - out.write((byte) 1); - break; - default: - out.write((byte) 0); - break; - } - return this; - } - catch (IOException e) + ByteBufferUtil.writeWithShortLength(buffer, out); + + /* + * Given the rules for eoc (end-of-component, see AbstractCompositeType.compare()), + * We can select: + * - = 'a' by using <'a'><0> + * - < 'a' by using <'a'><-1> + * - <= 'a' by using <'a'><1> + * - > 'a' by using <'a'><1> + * - >= 'a' by using <'a'><0> + */ + switch (op) { - throw new RuntimeException(e); + case LT: + out.write((byte) -1); + break; + case GT: + case LTE: + out.write((byte) 1); + break; + default: + out.write((byte) 0); + break; } + return this; } public Builder add(ByteBuffer bb) @@ -239,16 +225,9 @@ public class CompositeType extends AbstractCompositeType if (current >= composite.types.size()) throw new IllegalStateException("Composite column is already fully constructed"); - try - { - ByteBufferUtil.writeWithShortLength(bb, out); - out.write((byte) 0); - return this; - } - catch (IOException e) - { - throw new RuntimeException(e); - } + ByteBufferUtil.writeWithShortLength(bb, out); + out.write((byte) 0); + return this; } public int componentCount() @@ -258,7 +237,8 @@ public class CompositeType extends AbstractCompositeType public ByteBuffer build() { - return ByteBuffer.wrap(baos.toByteArray()); + // potentially slightly space-wasteful in favor of avoiding a copy + return ByteBuffer.wrap(out.getData(), 0, out.getLength()); } public ByteBuffer buildAsEndOfRange() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java index 6248657..93b9bc5 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java @@ -19,11 +19,14 @@ package org.apache.cassandra.io.util; import java.io.DataOutputStream; +import java.io.IOException; /** - * An implementation of the DataOutputStream interface. This class is completely thread - * unsafe. + * An implementation of the DataOutputStream interface using a FastByteArrayOutputStream and exposing + * its buffer so copies can be avoided. + * + * This class is completely thread unsafe. */ public final class DataOutputBuffer extends DataOutputStream { @@ -34,12 +37,33 @@ public final class DataOutputBuffer extends DataOutputStream public DataOutputBuffer(int size) { - super(new OutputBuffer(size)); + super(new FastByteArrayOutputStream(size)); + } + + @Override + public void write(int b) + { + try + { + super.write(b); + } + catch (IOException e) + { + throw new AssertionError(e); // FBOS does not throw IOE + } } - private OutputBuffer buffer() + @Override + public void write(byte[] b, int off, int len) { - return (OutputBuffer)out; + try + { + super.write(b, off, len); + } + catch (IOException e) + { + throw new AssertionError(e); // FBOS does not throw IOE + } } /** @@ -48,20 +72,12 @@ public final class DataOutputBuffer extends DataOutputStream */ public byte[] getData() { - return buffer().getData(); + return ((FastByteArrayOutputStream) out).buf; } /** Returns the length of the valid data currently in the buffer. */ public int getLength() { - return buffer().getLength(); - } - - /** Resets the buffer to empty. */ - public DataOutputBuffer reset() - { - this.written = 0; - buffer().reset(); - return this; + return ((FastByteArrayOutputStream) out).count; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/src/java/org/apache/cassandra/io/util/OutputBuffer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/OutputBuffer.java b/src/java/org/apache/cassandra/io/util/OutputBuffer.java deleted file mode 100644 index 2a64430..0000000 --- a/src/java/org/apache/cassandra/io/util/OutputBuffer.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.io.util; - -import java.io.DataInput; -import java.io.IOException; - -/** - * Extends FastByteArrayOutputStream to minimize copies. - */ -public final class OutputBuffer extends FastByteArrayOutputStream -{ - public OutputBuffer() - { - this(128); - } - - public OutputBuffer(int size) - { - super(size); - } - - public byte[] getData() - { - return buf; - } - - public int getLength() - { - return count; - } - - public void write(DataInput in, int len) throws IOException - { - int newcount = count + len; - if (newcount > buf.length) - { - byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)]; - System.arraycopy(buf, 0, newbuf, 0, count); - buf = newbuf; - } - in.readFully(buf, count, len); - count = newcount; - } - - /** - * @return The valid contents of the buffer, possibly by copying: only safe for one-time-use buffers. - */ - public byte[] asByteArray() - { - if (count == buf.length) - // no-copy - return buf; - // copy - return this.toByteArray(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9252ee95/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 5fd3a45..27c7cb9 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -24,7 +24,6 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.*; -import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.zip.DataFormatException; import java.util.zip.Inflater; @@ -40,18 +39,14 @@ import org.apache.cassandra.config.*; import org.apache.cassandra.cql.CQLStatement; import org.apache.cassandra.cql.QueryProcessor; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.marshal.MarshalException; -import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.dht.*; -import org.apache.cassandra.io.util.FastByteArrayOutputStream; -import org.apache.cassandra.locator.*; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.DynamicEndpointSnitch; import org.apache.cassandra.scheduler.IRequestScheduler; -import org.apache.cassandra.service.ClientState; -import org.apache.cassandra.service.MigrationManager; -import org.apache.cassandra.service.SocketSessionManagementService; -import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.thrift.TException; @@ -1170,7 +1165,7 @@ public class CassandraServer implements Cassandra.Iface switch (compression) { case GZIP: - FastByteArrayOutputStream byteArray = new FastByteArrayOutputStream(); + DataOutputBuffer decompressed = new DataOutputBuffer(); byte[] outBuffer = new byte[1024], inBuffer = new byte[1024]; Inflater decompressor = new Inflater(); @@ -1185,7 +1180,7 @@ public class CassandraServer implements Cassandra.Iface int lenWrite = 0; while ((lenWrite = decompressor.inflate(outBuffer)) !=0) - byteArray.write(outBuffer, 0, lenWrite); + decompressed.write(outBuffer, 0, lenWrite); if (decompressor.finished()) break; @@ -1193,7 +1188,7 @@ public class CassandraServer implements Cassandra.Iface decompressor.end(); - queryString = new String(byteArray.toByteArray(), 0, byteArray.size(), "UTF-8"); + queryString = new String(decompressed.getData(), 0, decompressed.size(), "UTF-8"); break; case NONE: try
