Repository: thrift Updated Branches: refs/heads/master f5b795d3b -> 664dd0a01
THRIFT-123: TZlibTransport for Java Client: Java Patch: Dragan Okiljevic, Keith Chew, Randy Abernethy Adds a Java 1.7 based TZlibTransport to the Java library. Project: http://git-wip-us.apache.org/repos/asf/thrift/repo Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/664dd0a0 Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/664dd0a0 Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/664dd0a0 Branch: refs/heads/master Commit: 664dd0a0130546d59e7398e72c1af09c692e5a6f Parents: f5b795d Author: Randy Abernethy <[email protected]> Authored: Sun Mar 29 10:10:02 2015 -0700 Committer: Randy Abernethy <[email protected]> Committed: Sun Mar 29 10:10:02 2015 -0700 ---------------------------------------------------------------------- .../apache/thrift/transport/TZlibTransport.java | 181 +++++++++++++++++++ 1 file changed, 181 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/thrift/blob/664dd0a0/lib/java/src/org/apache/thrift/transport/TZlibTransport.java ---------------------------------------------------------------------- diff --git a/lib/java/src/org/apache/thrift/transport/TZlibTransport.java b/lib/java/src/org/apache/thrift/transport/TZlibTransport.java new file mode 100644 index 0000000..25c9d01 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TZlibTransport.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport; + +import java.util.zip.Deflater; +import java.util.zip.Inflater; +import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.transport.TMemoryInputTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; + +/** + * TZlibTransport deflates on write and inflates on read. + */ +public class TZlibTransport extends TTransport { + //Class constants + public static final int INFLATE_BUF_SIZE = 1024; + public static final int READ_BUF_SIZE = 1024; + public static final int INIT_WRITE_BUF_SIZE = 1024; + //Client rw buffers and underlying transport + private TByteArrayOutputStream writeBuffer_ = new TByteArrayOutputStream(INIT_WRITE_BUF_SIZE); + private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]); + private TTransport transport_ = null; + //Zip objects and buffers + private byte[] inflateBuf = new byte[INFLATE_BUF_SIZE]; + private byte[] readBuf = new byte[READ_BUF_SIZE]; + private Inflater decompresser = new Inflater(false); + private Deflater compresser = new Deflater(Deflater.BEST_COMPRESSION, false); + + public static class Factory extends TTransportFactory { + public Factory() { + } + + @Override + public TTransport getTransport(TTransport base) { + return new TZlibTransport(base); + } + } + + /** + * Constructs a new TZlibTransport instance. + * @param transport the underlying transport to read from and write to + */ + public TZlibTransport(TTransport transport) { + transport_ = transport; + } + + /** + * Constructs a new TZlibTransport instance. + * @param transport the underlying transport to read from and write to + * @param compressionLevel 0 for no compression, 9 for maximum compression + */ + public TZlibTransport(TTransport transport, int compressionLevel) { + transport_ = transport; + compresser = new Deflater(compressionLevel, false); + } + + @Override + public void open() throws TTransportException { + transport_.open(); + } + + @Override + public boolean isOpen() { + return transport_.isOpen(); + } + + @Override + public void close() { + readBuffer_.reset(new byte[0]); + writeBuffer_.reset(); + compresser.reset(); + decompresser.reset(); + transport_.close(); + } + + @Override + public int read(byte[] buf, int off, int len) throws TTransportException { + int bytesRead = readBuffer_.read(buf, off, len); + if (bytesRead > 0) { + return bytesRead; + } + + while (true) { + if (readComp() > 0) { + break; + } + } + + return readBuffer_.read(buf, off, len); + } + + private int readComp() throws TTransportException { + //If low level read buffer is exhausted, read more bytes from underlying transport + if (decompresser.needsInput()) { + int bytesRead = transport_.read(readBuf, 0, READ_BUF_SIZE); + decompresser.setInput(readBuf, 0, bytesRead); + } + //Decompress bytes into high level client read buffer + try { + int InflatedBytes = decompresser.inflate(inflateBuf); + if (InflatedBytes <= 0) { + return 0; + } + + byte[] old = new byte[readBuffer_.getBytesRemainingInBuffer()]; + readBuffer_.read(old, 0, readBuffer_.getBytesRemainingInBuffer()); + byte[] all = new byte[old.length + InflatedBytes]; + System.arraycopy(old, 0, all, 0, old.length); + System.arraycopy(inflateBuf, 0, all, old.length, InflatedBytes); + + readBuffer_.reset(all); + return all.length; + } catch (java.util.zip.DataFormatException ex) { + throw new TTransportException(ex); + } + } + + @Override + public byte[] getBuffer() { + return readBuffer_.getBuffer(); + } + + @Override + public int getBufferPosition() { + return readBuffer_.getBufferPosition(); + } + + @Override + public int getBytesRemainingInBuffer() { + return readBuffer_.getBytesRemainingInBuffer(); + } + + @Override + public void consumeBuffer(int len) { + readBuffer_.consumeBuffer(len); + } + + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + writeBuffer_.write(buf, off, len); + } + + /** + * Compress write buffer and send it to underlying transport. + */ + @Override + public void flush() throws TTransportException { + byte[] buf = writeBuffer_.get(); + writeBuffer_.reset(); + compresser.setInput(buf); + + byte[] compBuf = new byte[buf.length * 2]; + int compressedDataLength = compresser.deflate(compBuf, 0, compBuf.length, Deflater.SYNC_FLUSH); + if (compressedDataLength >= compBuf.length) { + throw new TTransportException("Compression error, compressed output exceeds buffer size"); + } + if (compressedDataLength > 0) { + transport_.write(compBuf, 0, compressedDataLength); + transport_.flush(); + } + } +} +
