This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch auto_compressor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b1ecc6ddbbfab00a9d7a3fb9c5f3816bbc58c9bd
Author: Tian Jiang <[email protected]>
AuthorDate: Mon May 29 11:01:08 2023 +0800

    add auto compressor
---
 .../apache/iotdb/tsfile/compress/ICompressor.java  |   3 +
 .../iotdb/tsfile/compress/auto/AutoCompressor.java | 116 ++++++++++
 .../tsfile/compress/auto/AutoUncompressor.java     |  74 +++++++
 .../tsfile/compress/auto/CompressionSampler.java   | 246 +++++++++++++++++++++
 .../file/metadata/enums/CompressionType.java       |   5 +-
 5 files changed, 443 insertions(+), 1 deletion(-)

diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
index ed6d9bdcc59..84e2614c67d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.tsfile.compress;
 
+import org.apache.iotdb.tsfile.compress.auto.AutoCompressor;
 import 
org.apache.iotdb.tsfile.exception.compress.CompressionTypeNotSupportedException;
 import 
org.apache.iotdb.tsfile.exception.compress.GZIPCompressOverflowException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -75,6 +76,8 @@ public interface ICompressor extends Serializable {
         return new ZstdCompressor();
       case LZMA2:
         return new LZMA2Compressor();
+      case AUTO:
+        return new AutoCompressor();
       default:
         throw new CompressionTypeNotSupportedException(name.toString());
     }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoCompressor.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoCompressor.java
new file mode 100644
index 00000000000..62f2877a534
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoCompressor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.compress.auto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+
+public class AutoCompressor implements ICompressor {
+
+  private CompressionSampler sampler;
+
+  public AutoCompressor() {
+    List<CompressionType> compressionTypes = collectCompressionTypes();
+    double alpha = 1.0;
+    long minSampleIntervalMS = 1000;
+    sampler = new CompressionSampler(compressionTypes, alpha, 
minSampleIntervalMS);
+  }
+
+  public AutoCompressor(double alpha, long minSampleIntervalMS) {
+    List<CompressionType> compressionTypes = collectCompressionTypes();
+    sampler = new CompressionSampler(compressionTypes, alpha, 
minSampleIntervalMS);
+  }
+
+  private static List<CompressionType> collectCompressionTypes() {
+    List<CompressionType> compressionTypeList = new ArrayList<>(
+        CompressionType.values().length - 1);
+    for (CompressionType type : CompressionType.values()) {
+      if (!type.equals(CompressionType.AUTO) && 
!type.equals(CompressionType.UNCOMPRESSED)) {
+        compressionTypeList.add(type);
+      }
+    }
+    return compressionTypeList;
+  }
+
+  @Override
+  public byte[] compress(byte[] data) throws IOException {
+    if (sampler.shouldSample()) {
+      return sampler.sample(data);
+    }
+    ICompressor preferredSampler = sampler.getPreferredSampler();
+    byte[] compress = preferredSampler.compress(data);
+    byte[] result = new byte[compress.length + 1];
+    System.arraycopy(compress, 0, result, 0, compress.length);
+    result[compress.length] = preferredSampler.getType().serialize();
+    return result;
+  }
+
+  @Override
+  public byte[] compress(byte[] data, int offset, int length) throws 
IOException {
+    if (sampler.shouldSample()) {
+      return sampler.sample(data, offset, length);
+    }
+    ICompressor preferredSampler = sampler.getPreferredSampler();
+    byte[] compress = preferredSampler.compress(data, offset, length);
+    byte[] result = new byte[compress.length + 1];
+    System.arraycopy(compress, 0, result, 0, compress.length);
+    result[compress.length] = preferredSampler.getType().serialize();
+    return result;
+  }
+
+  @Override
+  public int compress(byte[] data, int offset, int length, byte[] compressed) 
throws IOException {
+    if (sampler.shouldSample()) {
+      return sampler.sample(data, offset, length, compressed);
+    }
+    ICompressor preferredSampler = sampler.getPreferredSampler();
+    int compressedLength = preferredSampler.compress(data, offset, length, 
compressed);
+    compressed[compressedLength] = preferredSampler.getType().serialize();
+    return compressedLength + 1;
+  }
+
+  @Override
+  public int compress(ByteBuffer data, ByteBuffer compressed) throws 
IOException {
+    if (sampler.shouldSample()) {
+      return sampler.sample(data, compressed);
+    }
+    ICompressor preferredSampler = sampler.getPreferredSampler();
+    int compressedLength = preferredSampler.compress(data, compressed);
+    compressed.mark();
+    compressed.position(compressed.position() + compressedLength);
+    compressed.put(preferredSampler.getType().serialize());
+    compressed.reset();
+    return compressedLength + 1;
+  }
+
+  @Override
+  public int getMaxBytesForCompression(int uncompressedDataSize) {
+    return sampler.getMaxBytesForCompression(uncompressedDataSize);
+  }
+
+  @Override
+  public CompressionType getType() {
+    return CompressionType.AUTO;
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoUncompressor.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoUncompressor.java
new file mode 100644
index 00000000000..037ffedabd7
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoUncompressor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.compress.auto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+
+public class AutoUncompressor implements IUnCompressor {
+
+  @Override
+  public int getUncompressedLength(byte[] array, int offset, int length) 
throws IOException {
+    byte realType = array[offset + length - 1];
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(
+        CompressionType.deserialize(realType));
+    return unCompressor.getUncompressedLength(array, offset, length);
+  }
+
+  @Override
+  public int getUncompressedLength(ByteBuffer buffer) throws IOException {
+    byte realType = buffer.array()[buffer.position() + buffer.remaining() - 1];
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(
+        CompressionType.deserialize(realType));
+    return unCompressor.getUncompressedLength(buffer);
+  }
+
+  @Override
+  public byte[] uncompress(byte[] byteArray) throws IOException {
+    byte realType = byteArray[byteArray.length - 1];
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(
+        CompressionType.deserialize(realType));
+    return unCompressor.uncompress(byteArray);
+  }
+
+  @Override
+  public int uncompress(byte[] byteArray, int offset, int length, byte[] 
output, int outOffset)
+      throws IOException {
+    byte realType = byteArray[offset + length - 1];
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(
+        CompressionType.deserialize(realType));
+    return unCompressor.uncompress(byteArray, offset, length, output, 
outOffset);
+  }
+
+  @Override
+  public int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws 
IOException {
+    byte realType = compressed.array()[compressed.position() + 
compressed.remaining() - 1];
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(
+        CompressionType.deserialize(realType));
+    return unCompressor.uncompress(compressed, uncompressed);
+  }
+
+  @Override
+  public CompressionType getCodecName() {
+    return null;
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/CompressionSampler.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/CompressionSampler.java
new file mode 100644
index 00000000000..8358c4896e5
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/CompressionSampler.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.compress.auto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompressionSampler {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(CompressionSampler.class);
+
+  private List<CompressionType> compressionTypes;
+  private long minSampleInterval;
+  private long lastSampleTimeMS;
+  private List<ICompressor> compressors;
+  private List<CompressionMonitor> monitors;
+  private int preferredCompressorIndex;
+
+  public CompressionSampler(List<CompressionType> compressionTypes, double 
alpha,
+      long minSampleInterval) {
+    this.compressionTypes = compressionTypes;
+    this.minSampleInterval = minSampleInterval;
+    this.monitors = new ArrayList<>(compressionTypes.size());
+    this.compressors = new ArrayList<>(compressionTypes.size());
+
+    int maxSampleNum = 10;
+
+    for (CompressionType compressionType : compressionTypes) {
+      monitors.add(new CompressionMonitor(maxSampleNum, alpha));
+      compressors.add(ICompressor.getCompressor(compressionType));
+    }
+  }
+
+  public boolean shouldSample() {
+    return System.currentTimeMillis() - lastSampleTimeMS >= minSampleInterval;
+  }
+
+  public ICompressor getPreferredSampler() {
+    return compressors.get(preferredCompressorIndex);
+  }
+
+  public byte[] sample(byte[] data) throws IOException {
+    return sample(data, 0, data.length);
+  }
+
+  public byte[] sample(byte[] data, int offset, int length) throws IOException 
{
+    CompressionType bestType = CompressionType.UNCOMPRESSED;
+    int smallestLength = length;
+    byte[] bestResult = data;
+
+    for (int i = 0; i < compressionTypes.size(); i++) {
+      ICompressor compressor = compressors.get(i);
+      CompressionMonitor monitor = monitors.get(i);
+      long startTime = System.currentTimeMillis();
+      byte[] compressed = compressor.compress(data, offset, length);
+      int bytesBeforeCompression = data.length;
+      int bytesAfterCompression = compressed.length;
+      long timeConsumption = System.currentTimeMillis() - startTime;
+      monitor.addSample(bytesBeforeCompression, bytesAfterCompression, 
timeConsumption);
+
+      if (bytesAfterCompression < smallestLength) {
+        smallestLength = bytesAfterCompression;
+        bestType = compressionTypes.get(i);
+        bestResult = compressed;
+      }
+    }
+
+    lastSampleTimeMS = System.currentTimeMillis();
+    updatePreferredIndex();
+
+    byte[] result = new byte[bestResult.length + 1];
+    System.arraycopy(bestResult, 0, result, 0, bestResult.length);
+    result[bestResult.length] = bestType.serialize();
+    return result;
+  }
+
+  public int sample(byte[] data, int offset, int length, byte[] compressed) 
throws IOException {
+    CompressionType bestType = CompressionType.UNCOMPRESSED;
+    int smallestLength = length;
+
+    for (int i = 0; i < compressionTypes.size(); i++) {
+      ICompressor compressor = compressors.get(i);
+      CompressionMonitor monitor = monitors.get(i);
+      long startTime = System.currentTimeMillis();
+      int bytesAfterCompression = compressor.compress(data, offset, length, 
compressed);
+      int bytesBeforeCompression = data.length;
+      long timeConsumption = System.currentTimeMillis() - startTime;
+      monitor.addSample(bytesBeforeCompression, bytesAfterCompression, 
timeConsumption);
+
+      if (bytesAfterCompression < smallestLength) {
+        smallestLength = bytesAfterCompression;
+        bestType = compressionTypes.get(i);
+      }
+    }
+
+    lastSampleTimeMS = System.currentTimeMillis();
+    updatePreferredIndex();
+
+    compressed[smallestLength] = bestType.serialize();
+    return smallestLength + 1;
+  }
+
+  public int sample(ByteBuffer data, ByteBuffer compressed) throws IOException 
{
+    CompressionType bestType = CompressionType.UNCOMPRESSED;
+    int smallestLength = data.remaining();
+
+    for (int i = 0; i < compressionTypes.size(); i++) {
+      ICompressor compressor = compressors.get(i);
+      CompressionMonitor monitor = monitors.get(i);
+      long startTime = System.currentTimeMillis();
+      int bytesAfterCompression = compressor.compress(data, compressed);
+      int bytesBeforeCompression = data.remaining();
+      long timeConsumption = System.currentTimeMillis() - startTime;
+      monitor.addSample(bytesBeforeCompression, bytesAfterCompression, 
timeConsumption);
+
+      if (bytesAfterCompression < smallestLength) {
+        smallestLength = bytesAfterCompression;
+        bestType = compressionTypes.get(i);
+      }
+    }
+
+    lastSampleTimeMS = System.currentTimeMillis();
+    updatePreferredIndex();
+
+    compressed.mark();
+    compressed.position(compressed.position() + smallestLength);
+    compressed.put(bestType.serialize());
+    compressed.reset();
+    return smallestLength + 1;
+  }
+
+  public int getMaxBytesForCompression(int uncompressedDataSize) {
+    int maxBytes = 0;
+    for (ICompressor compressor : compressors) {
+      maxBytes = Math.max(maxBytes, 
compressor.getMaxBytesForCompression(uncompressedDataSize));
+    }
+    return maxBytes;
+  }
+
+
+  private void updatePreferredIndex() {
+    double bestScore = 0;
+    int prevIndex = preferredCompressorIndex;
+    for (int i = 0; i < monitors.size(); i++) {
+      double score = monitors.get(i).score();
+      if (score > bestScore) {
+        preferredCompressorIndex = i;
+      }
+    }
+    if (prevIndex != preferredCompressorIndex) {
+      logger.info("Preferred compressor changed to {}", 
compressors.get(preferredCompressorIndex));
+    }
+  }
+
+  private static class CompressionSample {
+
+    private long bytesBeforeCompression;
+    private long bytesAfterCompression;
+    private long timeConsumptionNS;
+
+    public CompressionSample(long bytesBeforeCompression, long 
bytesAfterCompression,
+        long timeConsumptionNS) {
+      this.bytesBeforeCompression = bytesBeforeCompression;
+      this.bytesAfterCompression = bytesAfterCompression;
+      this.timeConsumptionNS = timeConsumptionNS;
+    }
+  }
+
+  private static class CompressionMonitor {
+
+    private Queue<CompressionSample> samples;
+    private int maxSampleNum;
+    private double alpha;
+    private long bytesBeforeCompressionSum;
+    private long bytesAfterCompressionSum;
+    private long timeConsumptionSumNS;
+
+    private CompressionMonitor(int maxSampleNum, double alpha) {
+      this.maxSampleNum = maxSampleNum;
+      this.samples = new ArrayDeque<>(maxSampleNum);
+      this.alpha = alpha;
+    }
+
+    private double compressionRatio() {
+      return bytesAfterCompressionSum * 1.0 / bytesBeforeCompressionSum;
+    }
+
+    private double throughput() {
+      return bytesBeforeCompressionSum * 1.0 / timeConsumptionSumNS;
+    }
+
+    private double score() {
+      return Math.pow(throughput(), alpha) / compressionRatio();
+    }
+
+    private void addSample(long bytesBeforeCompression, long 
bytesAfterCompression,
+        long timeConsumptionNS) {
+      CompressionSample sample = new CompressionSample(bytesBeforeCompression,
+          bytesAfterCompression, timeConsumptionNS);
+      if (samples.size() < maxSampleNum) {
+        addSample(sample);
+      } else {
+        removeSample();
+      }
+    }
+
+    private void addSample(CompressionSample sample) {
+      bytesAfterCompressionSum += sample.bytesAfterCompression;
+      bytesBeforeCompressionSum += sample.bytesBeforeCompression;
+      timeConsumptionSumNS += sample.timeConsumptionNS;
+      samples.add(sample);
+    }
+
+    private void removeSample() {
+      CompressionSample sample = samples.remove();
+      bytesBeforeCompressionSum -= sample.bytesBeforeCompression;
+      bytesAfterCompressionSum -= sample.bytesAfterCompression;
+      timeConsumptionSumNS -= sample.timeConsumptionNS;
+    }
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
index 9afa137f27c..86ea6db4dee 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
@@ -34,7 +34,8 @@ public enum CompressionType {
   /** ZSTD */
   ZSTD(".zstd", (byte) 8),
   /** LZMA2 */
-  LZMA2(".lzma2", (byte) 9);
+  LZMA2(".lzma2", (byte) 9),
+  AUTO(".auto", (byte) 10);
 
   private final String extensionName;
   private final byte index;
@@ -64,6 +65,8 @@ public enum CompressionType {
         return CompressionType.ZSTD;
       case 9:
         return CompressionType.LZMA2;
+      case 10:
+        return CompressionType.AUTO;
       default:
         throw new IllegalArgumentException("Invalid input: " + compressor);
     }

Reply via email to