Repository: cassandra Updated Branches: refs/heads/trunk 230cca580 -> 0352a15a3
Restore performance of writeUTF; follow up commit to CASSANDRA-8670 patch by ariel and benedict for CASSANDRA-8670 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0352a15a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0352a15a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0352a15a Branch: refs/heads/trunk Commit: 0352a15a318e8121f8ec977d28379961a9aec387 Parents: 230cca5 Author: Ariel Weisberg <[email protected]> Authored: Tue Mar 31 22:21:43 2015 +0100 Committer: Benedict Elliott Smith <[email protected]> Committed: Tue Mar 31 22:21:43 2015 +0100 ---------------------------------------------------------------------- .../cassandra/io/util/DataOutputStreamPlus.java | 2 +- .../io/util/UnbufferedDataOutputStreamPlus.java | 99 +++---- .../test/microbench/OutputStreamBench.java | 274 +++++++++++++++++++ .../io/util/BufferedDataOutputStreamTest.java | 26 +- 4 files changed, 342 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0352a15a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java index 6de2879..a846384 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java @@ -55,7 +55,7 @@ public abstract class DataOutputStreamPlus extends OutputStream implements DataO protected static byte[] retrieveTemporaryBuffer(int minSize) { byte[] bytes = tempBuffer.get(); - if (bytes.length < minSize) + if (bytes.length < Math.min(minSize, MAX_BUFFER_SIZE)) { // increase in powers of 2, to avoid wasted repeat allocations bytes = new byte[Math.min(MAX_BUFFER_SIZE, 2 * Integer.highestOneBit(minSize))]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0352a15a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java index 31abfa8..ac3bae5 100644 --- a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java +++ b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java @@ -252,81 +252,72 @@ public abstract class UnbufferedDataOutputStreamPlus extends DataOutputStreamPlu public static void writeUTF(String str, DataOutput out) throws IOException { int length = str.length(); - int utfCount = calculateUTFLength(str, length); + int utfCount = 0; + for (int i = 0 ; i < length ; i++) + { + int ch = str.charAt(i); + if ((ch > 0) & (ch <= 127)) + utfCount += 1; + else if (ch <= 2047) + utfCount += 2; + else + utfCount += 3; + } if (utfCount > 65535) throw new UTFDataFormatException(); //$NON-NLS-1$ byte[] utfBytes = retrieveTemporaryBuffer(utfCount + 2); - int utfIndex = 2; - utfBytes[0] = (byte) (utfCount >> 8); - utfBytes[1] = (byte) utfCount; int bufferLength = utfBytes.length; - - if (utfCount == length && utfCount + utfIndex < bufferLength) + if (utfCount == length) { - for (int charIndex = 0 ; charIndex < length ; charIndex++) - utfBytes[utfIndex++] = (byte) str.charAt(charIndex); + utfBytes[0] = (byte) (utfCount >> 8); + utfBytes[1] = (byte) utfCount; + int firstIndex = 2; + for (int offset = 0 ; offset < length ; offset += bufferLength) + { + int runLength = Math.min(bufferLength - firstIndex, length - offset) + firstIndex; + offset -= firstIndex; + for (int i = firstIndex ; i < runLength; i++) + utfBytes[i] = (byte) str.charAt(offset + i); + out.write(utfBytes, 0, runLength); + offset += firstIndex; + firstIndex = 0; + } } else { - int charIndex = 0; - while (charIndex < length) + int utfIndex = 2; + utfBytes[0] = (byte) (utfCount >> 8); + utfBytes[1] = (byte) utfCount; + for (int charIndex = 0 ; charIndex < length ; charIndex++) { - char ch = str.charAt(charIndex); - int sizeOfChar = sizeOfChar(ch); - if (utfIndex + sizeOfChar > bufferLength) + if (utfIndex + 3 > bufferLength) { out.write(utfBytes, 0, utfIndex); utfIndex = 0; } - switch (sizeOfChar) + char ch = str.charAt(charIndex); + if ((ch > 0) & (ch <= 127)) + { + utfBytes[utfIndex++] = (byte) ch; + } + else if (ch <= 2047) { - case 3: - utfBytes[utfIndex] = (byte) (0xe0 | (0x0f & (ch >> 12))); - utfBytes[utfIndex + 1] = (byte) (0x80 | (0x3f & (ch >> 6))); - utfBytes[utfIndex + 2] = (byte) (0x80 | (0x3f & ch)); - break; - case 2: - utfBytes[utfIndex] = (byte) (0xc0 | (0x1f & (ch >> 6))); - utfBytes[utfIndex + 1] = (byte) (0x80 | (0x3f & ch)); - break; - case 1: - utfBytes[utfIndex] = (byte) ch; - break; - default: - throw new IllegalStateException(); + utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (ch >> 6))); + utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & ch)); + } + else + { + utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (ch >> 12))); + utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (ch >> 6))); + utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & ch)); } - utfIndex += sizeOfChar; - charIndex++; } + out.write(utfBytes, 0, utfIndex); } - out.write(utfBytes, 0, utfIndex); - } - - /* - * Factored out into separate method to create more flexibility around inlining - */ - private static int calculateUTFLength(String str, int length) - { - int utfCount = 0; - for (int i = 0; i < length; i++) - utfCount += sizeOfChar(str.charAt(i)); - return utfCount; - } - - private static int sizeOfChar(int ch) - { - // wrap 0 around to max, because it requires 3 bytes - return 1 - // if >= 128, we need an extra byte, so we divide by 128 and check the value is > 0 - // (by negating it and taking the sign bit) - + (-(ch / 128) >>> 31) - // if >= 2048, or == 0, we need another extra byte; we subtract one and wrap around, - // so we only then need to confirm it is greater than 2047 - + (-(((ch - 1) & 0xffff) / 2047) >>> 31); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/0352a15a/test/microbench/org/apache/cassandra/test/microbench/OutputStreamBench.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/OutputStreamBench.java b/test/microbench/org/apache/cassandra/test/microbench/OutputStreamBench.java new file mode 100644 index 0000000..b8136f7 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/OutputStreamBench.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; +import org.apache.cassandra.io.util.BufferedDataOutputStreamTest; +import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; +import org.openjdk.jmh.annotations.*; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 3,jvmArgsAppend = "-Xmx512M") +@Threads(1) +@State(Scope.Benchmark) +public class OutputStreamBench +{ + + BufferedOutputStream hole = new BufferedOutputStream(new OutputStream() { + + @Override + public void write(int b) throws IOException + { + + } + + @Override + public void write(byte b[]) throws IOException { + + } + + @Override + public void write(byte b[], int a, int c) throws IOException { + + } + }); + + WrappedDataOutputStreamPlus streamA = new WrappedDataOutputStreamPlus(hole); + + BufferedDataOutputStreamPlus streamB = new BufferedDataOutputStreamPlus(new WritableByteChannel() { + + @Override + public boolean isOpen() + { + // TODO Auto-generated method stub + return true; + } + + @Override + public void close() throws IOException + { + // TODO Auto-generated method stub + + } + + @Override + public int write(ByteBuffer src) throws IOException + { + int remaining = src.remaining(); + src.position(src.limit()); + return remaining; + } + + }, 8192); + + public static byte foo; + + public static int foo1; + + public static long foo2; + + public static double foo3; + + public static float foo4; + + public static short foo5; + + public static char foo6; + + @Benchmark + public void testBOSByte() throws IOException + { + streamA.write(foo); + } + + @Benchmark + public void testBDOSPByte() throws IOException + { + streamB.write(foo); + } + + @Benchmark + public void testBOSInt() throws IOException + { + streamA.writeInt(foo1); + } + + @Benchmark + public void testBDOSPInt() throws IOException + { + streamB.writeInt(foo1); + } + + @Benchmark + public void testBOSLong() throws IOException + { + streamA.writeLong(foo2); + } + + @Benchmark + public void testBDOSPLong() throws IOException + { + streamB.writeLong(foo2); + } + + @Benchmark + public void testBOSMixed() throws IOException + { + streamA.write(foo); + streamA.writeInt(foo1); + streamA.writeLong(foo2); + streamA.writeDouble(foo3); + streamA.writeFloat(foo4); + streamA.writeShort(foo5); + streamA.writeChar(foo6); + } + + @Benchmark + public void testBDOSPMixed() throws IOException + { + streamB.write(foo); + streamB.writeInt(foo1); + streamB.writeLong(foo2); + streamB.writeDouble(foo3); + streamB.writeFloat(foo4); + streamB.writeShort(foo5); + streamB.writeChar(foo6); + } + + public static String tinyM = "ð ¹"; + public static String smallM = "ð ¹ã¨Æð ¹ã¨Æð ¹ã¨Æð ¹ã¨Æð ¹ã¨Æð ¹ã¨Æð ¹ã¨Æð ¹ã¨Æð ¹ã¨Æð ¹ã¨Æð ¹ã¨Æ"; + public static String largeM; + public static String tiny = "a"; + public static String small = "adsjglhnafsjk;gujfakyhgukafshgjkahfsgjkhafs;jkhausjkgaksfj;gafskdghajfsk;g"; + public static String large; + + static { + StringBuilder sb = new StringBuilder(); + while (sb.length() < 1024 * 12) { + sb.append(small); + } + large = sb.toString(); + + sb = new StringBuilder(); + while (sb.length() < 1024 * 12) { + sb.append(smallM); + } + largeM = sb.toString(); + } + + @Benchmark + public void testMTinyStringBOS() throws IOException { + streamA.writeUTF(tinyM); + } + + @Benchmark + public void testMTinyStringBDOSP() throws IOException { + streamB.writeUTF(tinyM); + } + + @Benchmark + public void testMTinyLegacyWriteUTF() throws IOException { + BufferedDataOutputStreamTest.writeUTFLegacy(tinyM, hole); + } + + @Benchmark + public void testMSmallStringBOS() throws IOException { + streamA.writeUTF(smallM); + } + + @Benchmark + public void testMSmallStringBDOSP() throws IOException { + streamB.writeUTF(smallM); + } + + @Benchmark + public void testMSmallLegacyWriteUTF() throws IOException { + BufferedDataOutputStreamTest.writeUTFLegacy(smallM, hole); + } + + @Benchmark + public void testMLargeStringBOS() throws IOException { + streamA.writeUTF(largeM); + } + + @Benchmark + public void testMLargeStringBDOSP() throws IOException { + streamB.writeUTF(largeM); + } + + @Benchmark + public void testMLargeLegacyWriteUTF() throws IOException { + BufferedDataOutputStreamTest.writeUTFLegacy(largeM, hole); + } + + @Benchmark + public void testTinyStringBOS() throws IOException { + streamA.writeUTF(tiny); + } + + @Benchmark + public void testTinyStringBDOSP() throws IOException { + streamB.writeUTF(tiny); + } + + @Benchmark + public void testTinyLegacyWriteUTF() throws IOException { + BufferedDataOutputStreamTest.writeUTFLegacy(tiny, hole); + } + + @Benchmark + public void testSmallStringBOS() throws IOException { + streamA.writeUTF(small); + } + + @Benchmark + public void testSmallStringBDOSP() throws IOException { + streamB.writeUTF(small); + } + + @Benchmark + public void testSmallLegacyWriteUTF() throws IOException { + BufferedDataOutputStreamTest.writeUTFLegacy(small, hole); + } + + @Benchmark + public void testRLargeStringBOS() throws IOException { + streamA.writeUTF(large); + } + + @Benchmark + public void testRLargeStringBDOSP() throws IOException { + streamB.writeUTF(large); + } + + @Benchmark + public void testRLargeLegacyWriteUTF() throws IOException { + BufferedDataOutputStreamTest.writeUTFLegacy(large, hole); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/0352a15a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java index 8ac6d92..8eaea31 100644 --- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java +++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java @@ -3,6 +3,7 @@ package org.apache.cassandra.io.util; import java.io.ByteArrayOutputStream; import java.io.DataOutput; import java.io.IOException; +import java.io.OutputStream; import java.io.UTFDataFormatException; import java.lang.reflect.Field; import java.nio.ByteBuffer; @@ -148,7 +149,7 @@ public class BufferedDataOutputStreamTest int action = 0; while (generated.size() < 1024 * 1024 * 8) { - action = r.nextInt(18); + action = r.nextInt(19); //System.out.println("Action " + action + " iteration " + iteration); iteration++; @@ -258,6 +259,9 @@ public class BufferedDataOutputStreamTest { StringBuilder sb = new StringBuilder(); int length = r.nextInt(500); + //Some times do big strings + if (r.nextDouble() > .95) + length += 4000; sb.append(simple + twoByte + threeByte + fourByte); for (int ii = 0; ii < length; ii++) { @@ -270,6 +274,20 @@ public class BufferedDataOutputStreamTest } case 15: { + StringBuilder sb = new StringBuilder(); + int length = r.nextInt(500); + sb.append("the very model of a modern major general familiar with all things animal vegetable and mineral"); + for (int ii = 0; ii < length; ii++) + { + sb.append(' '); + } + String str = sb.toString(); + writeUTFLegacy(str, dosp); + ndosp.writeUTF(str); + break; + } + case 16: + { ByteBuffer buf = ByteBuffer.allocate(r.nextInt(1024 * 8 + 1)); r.nextBytes(buf.array()); buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity())); @@ -281,7 +299,7 @@ public class BufferedDataOutputStreamTest dosp.write(buf.duplicate()); break; } - case 16: + case 17: { ByteBuffer buf = ByteBuffer.allocateDirect(r.nextInt(1024 * 8 + 1)); while (buf.hasRemaining()) @@ -295,7 +313,7 @@ public class BufferedDataOutputStreamTest dosp.write(buf.duplicate()); break; } - case 17: + case 18: { try (Memory buf = Memory.allocate(r.nextInt(1024 * 8 - 1) + 1);) { @@ -317,7 +335,7 @@ public class BufferedDataOutputStreamTest assertSameOutput(0, -1, iteration); } - static void writeUTFLegacy(String str, DataOutput out) throws IOException + public static void writeUTFLegacy(String str, OutputStream out) throws IOException { int utfCount = 0, length = str.length(); for (int i = 0; i < length; i++)
