This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch auto_compressor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b1ecc6ddbbfab00a9d7a3fb9c5f3816bbc58c9bd Author: Tian Jiang <[email protected]> AuthorDate: Mon May 29 11:01:08 2023 +0800 add auto compressor --- .../apache/iotdb/tsfile/compress/ICompressor.java | 3 + .../iotdb/tsfile/compress/auto/AutoCompressor.java | 116 ++++++++++ .../tsfile/compress/auto/AutoUncompressor.java | 74 +++++++ .../tsfile/compress/auto/CompressionSampler.java | 246 +++++++++++++++++++++ .../file/metadata/enums/CompressionType.java | 5 +- 5 files changed, 443 insertions(+), 1 deletion(-) diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java index ed6d9bdcc59..84e2614c67d 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.tsfile.compress; +import org.apache.iotdb.tsfile.compress.auto.AutoCompressor; import org.apache.iotdb.tsfile.exception.compress.CompressionTypeNotSupportedException; import org.apache.iotdb.tsfile.exception.compress.GZIPCompressOverflowException; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -75,6 +76,8 @@ public interface ICompressor extends Serializable { return new ZstdCompressor(); case LZMA2: return new LZMA2Compressor(); + case AUTO: + return new AutoCompressor(); default: throw new CompressionTypeNotSupportedException(name.toString()); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoCompressor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoCompressor.java new file mode 100644 index 00000000000..62f2877a534 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoCompressor.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.compress.auto; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.tsfile.compress.ICompressor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; + +public class AutoCompressor implements ICompressor { + + private CompressionSampler sampler; + + public AutoCompressor() { + List<CompressionType> compressionTypes = collectCompressionTypes(); + double alpha = 1.0; + long minSampleIntervalMS = 1000; + sampler = new CompressionSampler(compressionTypes, alpha, minSampleIntervalMS); + } + + public AutoCompressor(double alpha, long minSampleIntervalMS) { + List<CompressionType> compressionTypes = collectCompressionTypes(); + sampler = new CompressionSampler(compressionTypes, alpha, minSampleIntervalMS); + } + + private static List<CompressionType> collectCompressionTypes() { + List<CompressionType> compressionTypeList = new ArrayList<>( + CompressionType.values().length - 1); + for (CompressionType type : CompressionType.values()) { + if (!type.equals(CompressionType.AUTO) && !type.equals(CompressionType.UNCOMPRESSED)) { + compressionTypeList.add(type); + } + } + return compressionTypeList; + } + + @Override + public byte[] compress(byte[] data) throws IOException { + if (sampler.shouldSample()) { + return sampler.sample(data); + } + ICompressor preferredSampler = sampler.getPreferredSampler(); + byte[] compress = preferredSampler.compress(data); + byte[] result = new byte[compress.length + 1]; + System.arraycopy(compress, 0, result, 0, compress.length); + result[compress.length] = preferredSampler.getType().serialize(); + return result; + } + + @Override + public byte[] compress(byte[] data, int offset, int length) throws IOException { + if (sampler.shouldSample()) { + return sampler.sample(data, offset, length); + } + ICompressor preferredSampler = sampler.getPreferredSampler(); + byte[] compress = preferredSampler.compress(data, offset, length); + byte[] result = new byte[compress.length + 1]; + System.arraycopy(compress, 0, result, 0, compress.length); + result[compress.length] = preferredSampler.getType().serialize(); + return result; + } + + @Override + public int compress(byte[] data, int offset, int length, byte[] compressed) throws IOException { + if (sampler.shouldSample()) { + return sampler.sample(data, offset, length, compressed); + } + ICompressor preferredSampler = sampler.getPreferredSampler(); + int compressedLength = preferredSampler.compress(data, offset, length, compressed); + compressed[compressedLength] = preferredSampler.getType().serialize(); + return compressedLength + 1; + } + + @Override + public int compress(ByteBuffer data, ByteBuffer compressed) throws IOException { + if (sampler.shouldSample()) { + return sampler.sample(data, compressed); + } + ICompressor preferredSampler = sampler.getPreferredSampler(); + int compressedLength = preferredSampler.compress(data, compressed); + compressed.mark(); + compressed.position(compressed.position() + compressedLength); + compressed.put(preferredSampler.getType().serialize()); + compressed.reset(); + return compressedLength + 1; + } + + @Override + public int getMaxBytesForCompression(int uncompressedDataSize) { + return sampler.getMaxBytesForCompression(uncompressedDataSize); + } + + @Override + public CompressionType getType() { + return CompressionType.AUTO; + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoUncompressor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoUncompressor.java new file mode 100644 index 00000000000..037ffedabd7 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoUncompressor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.compress.auto; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iotdb.tsfile.compress.IUnCompressor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; + +public class AutoUncompressor implements IUnCompressor { + + @Override + public int getUncompressedLength(byte[] array, int offset, int length) throws IOException { + byte realType = array[offset + length - 1]; + IUnCompressor unCompressor = IUnCompressor.getUnCompressor( + CompressionType.deserialize(realType)); + return unCompressor.getUncompressedLength(array, offset, length); + } + + @Override + public int getUncompressedLength(ByteBuffer buffer) throws IOException { + byte realType = buffer.array()[buffer.position() + buffer.remaining() - 1]; + IUnCompressor unCompressor = IUnCompressor.getUnCompressor( + CompressionType.deserialize(realType)); + return unCompressor.getUncompressedLength(buffer); + } + + @Override + public byte[] uncompress(byte[] byteArray) throws IOException { + byte realType = byteArray[byteArray.length - 1]; + IUnCompressor unCompressor = IUnCompressor.getUnCompressor( + CompressionType.deserialize(realType)); + return unCompressor.uncompress(byteArray); + } + + @Override + public int uncompress(byte[] byteArray, int offset, int length, byte[] output, int outOffset) + throws IOException { + byte realType = byteArray[offset + length - 1]; + IUnCompressor unCompressor = IUnCompressor.getUnCompressor( + CompressionType.deserialize(realType)); + return unCompressor.uncompress(byteArray, offset, length, output, outOffset); + } + + @Override + public int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException { + byte realType = compressed.array()[compressed.position() + compressed.remaining() - 1]; + IUnCompressor unCompressor = IUnCompressor.getUnCompressor( + CompressionType.deserialize(realType)); + return unCompressor.uncompress(compressed, uncompressed); + } + + @Override + public CompressionType getCodecName() { + return null; + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/CompressionSampler.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/CompressionSampler.java new file mode 100644 index 00000000000..8358c4896e5 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/CompressionSampler.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.compress.auto; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import org.apache.iotdb.tsfile.compress.ICompressor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CompressionSampler { + + private static final Logger logger = LoggerFactory.getLogger(CompressionSampler.class); + + private List<CompressionType> compressionTypes; + private long minSampleInterval; + private long lastSampleTimeMS; + private List<ICompressor> compressors; + private List<CompressionMonitor> monitors; + private int preferredCompressorIndex; + + public CompressionSampler(List<CompressionType> compressionTypes, double alpha, + long minSampleInterval) { + this.compressionTypes = compressionTypes; + this.minSampleInterval = minSampleInterval; + this.monitors = new ArrayList<>(compressionTypes.size()); + this.compressors = new ArrayList<>(compressionTypes.size()); + + int maxSampleNum = 10; + + for (CompressionType compressionType : compressionTypes) { + monitors.add(new CompressionMonitor(maxSampleNum, alpha)); + compressors.add(ICompressor.getCompressor(compressionType)); + } + } + + public boolean shouldSample() { + return System.currentTimeMillis() - lastSampleTimeMS >= minSampleInterval; + } + + public ICompressor getPreferredSampler() { + return compressors.get(preferredCompressorIndex); + } + + public byte[] sample(byte[] data) throws IOException { + return sample(data, 0, data.length); + } + + public byte[] sample(byte[] data, int offset, int length) throws IOException { + CompressionType bestType = CompressionType.UNCOMPRESSED; + int smallestLength = length; + byte[] bestResult = data; + + for (int i = 0; i < compressionTypes.size(); i++) { + ICompressor compressor = compressors.get(i); + CompressionMonitor monitor = monitors.get(i); + long startTime = System.currentTimeMillis(); + byte[] compressed = compressor.compress(data, offset, length); + int bytesBeforeCompression = data.length; + int bytesAfterCompression = compressed.length; + long timeConsumption = System.currentTimeMillis() - startTime; + monitor.addSample(bytesBeforeCompression, bytesAfterCompression, timeConsumption); + + if (bytesAfterCompression < smallestLength) { + smallestLength = bytesAfterCompression; + bestType = compressionTypes.get(i); + bestResult = compressed; + } + } + + lastSampleTimeMS = System.currentTimeMillis(); + updatePreferredIndex(); + + byte[] result = new byte[bestResult.length + 1]; + System.arraycopy(bestResult, 0, result, 0, bestResult.length); + result[bestResult.length] = bestType.serialize(); + return result; + } + + public int sample(byte[] data, int offset, int length, byte[] compressed) throws IOException { + CompressionType bestType = CompressionType.UNCOMPRESSED; + int smallestLength = length; + + for (int i = 0; i < compressionTypes.size(); i++) { + ICompressor compressor = compressors.get(i); + CompressionMonitor monitor = monitors.get(i); + long startTime = System.currentTimeMillis(); + int bytesAfterCompression = compressor.compress(data, offset, length, compressed); + int bytesBeforeCompression = data.length; + long timeConsumption = System.currentTimeMillis() - startTime; + monitor.addSample(bytesBeforeCompression, bytesAfterCompression, timeConsumption); + + if (bytesAfterCompression < smallestLength) { + smallestLength = bytesAfterCompression; + bestType = compressionTypes.get(i); + } + } + + lastSampleTimeMS = System.currentTimeMillis(); + updatePreferredIndex(); + + compressed[smallestLength] = bestType.serialize(); + return smallestLength + 1; + } + + public int sample(ByteBuffer data, ByteBuffer compressed) throws IOException { + CompressionType bestType = CompressionType.UNCOMPRESSED; + int smallestLength = data.remaining(); + + for (int i = 0; i < compressionTypes.size(); i++) { + ICompressor compressor = compressors.get(i); + CompressionMonitor monitor = monitors.get(i); + long startTime = System.currentTimeMillis(); + int bytesAfterCompression = compressor.compress(data, compressed); + int bytesBeforeCompression = data.remaining(); + long timeConsumption = System.currentTimeMillis() - startTime; + monitor.addSample(bytesBeforeCompression, bytesAfterCompression, timeConsumption); + + if (bytesAfterCompression < smallestLength) { + smallestLength = bytesAfterCompression; + bestType = compressionTypes.get(i); + } + } + + lastSampleTimeMS = System.currentTimeMillis(); + updatePreferredIndex(); + + compressed.mark(); + compressed.position(compressed.position() + smallestLength); + compressed.put(bestType.serialize()); + compressed.reset(); + return smallestLength + 1; + } + + public int getMaxBytesForCompression(int uncompressedDataSize) { + int maxBytes = 0; + for (ICompressor compressor : compressors) { + maxBytes = Math.max(maxBytes, compressor.getMaxBytesForCompression(uncompressedDataSize)); + } + return maxBytes; + } + + + private void updatePreferredIndex() { + double bestScore = 0; + int prevIndex = preferredCompressorIndex; + for (int i = 0; i < monitors.size(); i++) { + double score = monitors.get(i).score(); + if (score > bestScore) { + preferredCompressorIndex = i; + } + } + if (prevIndex != preferredCompressorIndex) { + logger.info("Preferred compressor changed to {}", compressors.get(preferredCompressorIndex)); + } + } + + private static class CompressionSample { + + private long bytesBeforeCompression; + private long bytesAfterCompression; + private long timeConsumptionNS; + + public CompressionSample(long bytesBeforeCompression, long bytesAfterCompression, + long timeConsumptionNS) { + this.bytesBeforeCompression = bytesBeforeCompression; + this.bytesAfterCompression = bytesAfterCompression; + this.timeConsumptionNS = timeConsumptionNS; + } + } + + private static class CompressionMonitor { + + private Queue<CompressionSample> samples; + private int maxSampleNum; + private double alpha; + private long bytesBeforeCompressionSum; + private long bytesAfterCompressionSum; + private long timeConsumptionSumNS; + + private CompressionMonitor(int maxSampleNum, double alpha) { + this.maxSampleNum = maxSampleNum; + this.samples = new ArrayDeque<>(maxSampleNum); + this.alpha = alpha; + } + + private double compressionRatio() { + return bytesAfterCompressionSum * 1.0 / bytesBeforeCompressionSum; + } + + private double throughput() { + return bytesBeforeCompressionSum * 1.0 / timeConsumptionSumNS; + } + + private double score() { + return Math.pow(throughput(), alpha) / compressionRatio(); + } + + private void addSample(long bytesBeforeCompression, long bytesAfterCompression, + long timeConsumptionNS) { + CompressionSample sample = new CompressionSample(bytesBeforeCompression, + bytesAfterCompression, timeConsumptionNS); + if (samples.size() < maxSampleNum) { + addSample(sample); + } else { + removeSample(); + } + } + + private void addSample(CompressionSample sample) { + bytesAfterCompressionSum += sample.bytesAfterCompression; + bytesBeforeCompressionSum += sample.bytesBeforeCompression; + timeConsumptionSumNS += sample.timeConsumptionNS; + samples.add(sample); + } + + private void removeSample() { + CompressionSample sample = samples.remove(); + bytesBeforeCompressionSum -= sample.bytesBeforeCompression; + bytesAfterCompressionSum -= sample.bytesAfterCompression; + timeConsumptionSumNS -= sample.timeConsumptionNS; + } + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java index 9afa137f27c..86ea6db4dee 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java @@ -34,7 +34,8 @@ public enum CompressionType { /** ZSTD */ ZSTD(".zstd", (byte) 8), /** LZMA2 */ - LZMA2(".lzma2", (byte) 9); + LZMA2(".lzma2", (byte) 9), + AUTO(".auto", (byte) 10); private final String extensionName; private final byte index; @@ -64,6 +65,8 @@ public enum CompressionType { return CompressionType.ZSTD; case 9: return CompressionType.LZMA2; + case 10: + return CompressionType.AUTO; default: throw new IllegalArgumentException("Invalid input: " + compressor); }
