This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch 1.1.0-anke
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/1.1.0-anke by this push:
new a9a355c0 add freq encoding
a9a355c0 is described below
commit a9a355c0c6f500be88ec66982495b2a2d6a4b2e9
Author: DESKTOP-L0L5GPJ\jt <[email protected]>
AuthorDate: Sat Jan 11 17:55:44 2025 +0800
add freq encoding
---
java/pom.xml | 5 +
java/tsfile/pom.xml | 4 +
.../apache/tsfile/common/conf/TSFileConfig.java | 22 ++
.../apache/tsfile/encoding/decoder/Decoder.java | 2 +
.../tsfile/encoding/decoder/FreqDecoder.java | 144 +++++++++
.../tsfile/encoding/encoder/FreqEncoder.java | 321 +++++++++++++++++++++
.../tsfile/encoding/encoder/TSEncodingBuilder.java | 62 ++++
.../org/apache/tsfile/utils/BitConstructor.java | 94 ++++++
.../java/org/apache/tsfile/utils/BitReader.java | 70 +++++
9 files changed, 724 insertions(+)
diff --git a/java/pom.xml b/java/pom.xml
index 32642d61..4fbbe5e0 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -100,6 +100,11 @@
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
+ <dependency>
+ <groupId>com.github.wendykierp</groupId>
+ <artifactId>JTransforms</artifactId>
+ <version>3.1</version>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>
diff --git a/java/tsfile/pom.xml b/java/tsfile/pom.xml
index a3d8ec1c..00534013 100644
--- a/java/tsfile/pom.xml
+++ b/java/tsfile/pom.xml
@@ -72,6 +72,10 @@
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.wendykierp</groupId>
+ <artifactId>JTransforms</artifactId>
+ </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java
b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java
index 8c95fb96..8b459f77 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java
@@ -219,6 +219,12 @@ public class TSFileConfig implements Serializable {
/** customizedProperties, this should be empty by default. */
private Properties customizedProperties = new Properties();
+ /** Default SNR for FREQ encoding is 40dB. */
+ private double freqEncodingSNR = 40;
+
+ /** Default block size for FREQ encoding is 1024. */
+ private int freqEncodingBlockSize = 1024;
+
public TSFileConfig() {
// do nothing because we already give default value to each field when
they are being declared
}
@@ -602,4 +608,20 @@ public class TSFileConfig implements Serializable {
public void setObjectStorageTsFileOutput(String objectStorageTsFileOutput) {
this.objectStorageTsFileOutput = objectStorageTsFileOutput;
}
+
+ public double getFreqEncodingSNR() {
+ return freqEncodingSNR;
+ }
+
+ public void setFreqEncodingSNR(double freqEncodingSNR) {
+ this.freqEncodingSNR = freqEncodingSNR;
+ }
+
+ public int getFreqEncodingBlockSize() {
+ return freqEncodingBlockSize;
+ }
+
+ public void setFreqEncodingBlockSize(int freqEncodingBlockSize) {
+ this.freqEncodingBlockSize = freqEncodingBlockSize;
+ }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
index 36c3d826..d2ea2518 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
@@ -177,6 +177,8 @@ public abstract class Decoder {
default:
throw new TsFileDecodingException(String.format(ERROR_MSG,
encoding, dataType));
}
+ case FREQ:
+ return new FreqDecoder();
default:
throw new TsFileDecodingException(String.format(ERROR_MSG, encoding,
dataType));
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/FreqDecoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/FreqDecoder.java
new file mode 100644
index 00000000..0dbb3dba
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/FreqDecoder.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.encoding.decoder;
+
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.BitReader;
+
+import org.jtransforms.dct.DoubleDCT_1D;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class FreqDecoder extends Decoder {
+
+ private double data[];
+
+ private int readTotalCount = 0;
+
+ private int nextReadIndex = 0;
+
+ public FreqDecoder() {
+ super(TSEncoding.FREQ);
+ }
+
+ @Override
+ public double readDouble(ByteBuffer buffer) {
+ if (nextReadIndex == readTotalCount) {
+ loadBlock(buffer);
+ nextReadIndex = 0;
+ }
+ return data[nextReadIndex++];
+ }
+
+ @Override
+ public float readFloat(ByteBuffer buffer) {
+ return (float) readDouble(buffer);
+ }
+
+ @Override
+ public int readInt(ByteBuffer buffer) {
+ return (int) Math.round(readDouble(buffer));
+ }
+
+ @Override
+ public long readLong(ByteBuffer buffer) {
+ return (long) Math.round(readDouble(buffer));
+ }
+
+ @Override
+ public boolean hasNext(ByteBuffer buffer) throws IOException {
+ return (nextReadIndex < readTotalCount) || buffer.hasRemaining();
+ }
+
+ @Override
+ public void reset() {
+ nextReadIndex = 0;
+ readTotalCount = 0;
+ }
+
+ private void loadBlock(ByteBuffer buffer) {
+ BitReader reader = new BitReader(buffer);
+ // Block size with 16 bits
+ this.readTotalCount = (int) reader.next(16);
+ // Number of reserved components with 16 bits
+ int m = (int) reader.next(16);
+ // Exponent of quantification level with 16 bits
+ int beta = (short) reader.next(16);
+ // Decode index sequence
+ int[] index = decodeIndex(m, reader);
+ // Decode value sequence
+ long[] value = decodeValue(m, reader);
+ reader.skip();
+ // Quantification
+ double eps = Math.pow(2, beta);
+ this.data = new double[readTotalCount];
+ for (int i = 0; i < m; i++) {
+ data[index[i]] = value[i] * eps;
+ }
+ DoubleDCT_1D dct = new DoubleDCT_1D(readTotalCount);
+ dct.inverse(data, true);
+ }
+
+ private long[] decodeValue(int m, BitReader reader) {
+ if (m == 0) {
+ return new long[0];
+ }
+ // Decode the encoded bit width of the first value with 8 bits
+ int bits = (int) reader.next(8);
+ // Decode min{|v|}
+ long min = reader.next(bits);
+ // Decode all values
+ long value[] = new long[m];
+ int symbol;
+ for (int i = 0; i < m; i++) {
+ symbol = (int) reader.next(1);
+ value[i] = reader.next(bits);
+ bits = getValueWidth(value[i]);
+ value[i] += min;
+ if (symbol == 1) { // Negative value
+ value[i] = -value[i];
+ }
+ }
+ return value;
+ }
+
+ private int[] decodeIndex(int m, BitReader reader) {
+ int[] value = new int[m];
+ int bitsWidth = getValueWidth(getValueWidth(readTotalCount - 1));
+ for (int i = 0; i < m; i += 8) {
+ int bits = (int) reader.next(bitsWidth);
+ for (int j = i; j < Math.min(i + 8, m); j++) {
+ value[j] = (int) reader.next(bits);
+ }
+ }
+ return value;
+ }
+
+ /**
+ * Get the valid bit width of x
+ *
+ * @param x
+ * @return valid bit width
+ */
+ private int getValueWidth(long x) {
+ return 64 - Long.numberOfLeadingZeros(x);
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/FreqEncoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/FreqEncoder.java
new file mode 100644
index 00000000..54c89745
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/FreqEncoder.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.encoding.encoder;
+
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.BitConstructor;
+
+import org.jtransforms.dct.DoubleDCT_1D;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.PriorityQueue;
+
+public class FreqEncoder extends Encoder {
+
+ public static final String FREQ_ENCODING_SNR = "freq_encoding_snr";
+ public static final String FREQ_ENCODING_BLOCK_SIZE =
"freq_encoding_block_size";
+ protected static final int BLOCK_DEFAULT_SIZE = 1024;
+ protected static final double DEFAULT_SNR = 40;
+ private static final Logger logger =
LoggerFactory.getLogger(FreqEncoder.class);
+ private int blockSize;
+ protected int writeIndex = 0;
+ private double threshold = 1e-4;
+ private int beta;
+ private double[] dataBuffer;
+ private DoubleDCT_1D transformer;
+
+ public FreqEncoder() {
+ this(BLOCK_DEFAULT_SIZE);
+ }
+
+ public FreqEncoder(int size) {
+ this(size, DEFAULT_SNR);
+ }
+
+ public FreqEncoder(int size, double snr) {
+ super(TSEncoding.FREQ);
+ this.blockSize = size;
+ this.transformer = new DoubleDCT_1D(blockSize);
+ this.dataBuffer = new double[blockSize];
+ snr = Math.max(snr, 0);
+ this.threshold = Math.pow(10, -snr / 10);
+ }
+
+ @Override
+ public void encode(double value, ByteArrayOutputStream out) {
+ dataBuffer[writeIndex] = value;
+ writeIndex++;
+ if (writeIndex == blockSize) {
+ flush(out);
+ }
+ }
+
+ @Override
+ public void encode(float value, ByteArrayOutputStream out) {
+ encode((double) value, out);
+ }
+
+ @Override
+ public void encode(int value, ByteArrayOutputStream out) {
+ encode((double) value, out);
+ }
+
+ @Override
+ public void encode(long value, ByteArrayOutputStream out) {
+ encode((double) value, out);
+ }
+
+ @Override
+ public void flush(ByteArrayOutputStream out) {
+ try {
+ flushBlock(out);
+ } catch (IOException e) {
+ logger.error("flush data to stream failed!", e);
+ }
+ }
+
+ @Override
+ public int getOneItemMaxSize() {
+ return 13;
+ }
+
+ @Override
+ public long getMaxByteSize() {
+ return 8 + 13 * writeIndex;
+ }
+
+ private void flushBlock(ByteArrayOutputStream out) throws IOException {
+ if (writeIndex > 0) {
+ dct();
+ ArrayList<Point> list = selectPoints(dataBuffer);
+ byte[] data = encodeBlock(list);
+ out.write(data);
+ writeIndex = 0;
+ }
+ }
+
+ private void dct() {
+ DoubleDCT_1D dct =
+ (writeIndex == this.blockSize) ? this.transformer : new
DoubleDCT_1D(writeIndex);
+ dct.forward(dataBuffer, true);
+ }
+
+ private byte[] encodeBlock(ArrayList<Point> list) {
+ // Quantification
+ int m = list.size();
+ int[] index = new int[m];
+ long[] value = new long[m];
+ double eps = Math.pow(2, beta);
+ for (int i = 0; i < m; i++) {
+ Point p = list.get(i);
+ index[i] = p.getIndex();
+ value[i] = Math.round(p.getValue() / eps);
+ }
+ BitConstructor constructor = new BitConstructor(9 + 13 * m);
+ // Block size with 16 bits
+ constructor.add(writeIndex, 16);
+ // Number of reserved components with 16 bits
+ constructor.add(m, 16);
+ // Exponent of quantification level with 16 bits
+ constructor.add(beta, 16);
+ // Encode the index sequence
+ encodeIndex(index, constructor);
+ // Encode the value sequence
+ encodeValue(value, constructor);
+ constructor.pad();
+ // return the encoded bytes
+ return constructor.toByteArray();
+ }
+
+ private void encodeIndex(int[] value, BitConstructor constructor) {
+ int bitsWidth = getValueWidth(getValueWidth(writeIndex - 1));
+ for (int i = 0; i < value.length; i += 8) {
+ int bits = 0;
+ for (int j = i; j < Math.min(value.length, i + 8); j++) {
+ bits = Math.max(bits, getValueWidth(value[j]));
+ }
+ constructor.add(bits, bitsWidth);
+ for (int j = i; j < Math.min(value.length, i + 8); j++) {
+ constructor.add(value[j], bits);
+ }
+ }
+ }
+
+ private void encodeValue(long[] value, BitConstructor constructor) {
+ if (value.length == 0) {
+ return;
+ }
+ // Encode the encoded bit width of the first value with 8 bits
+ int bits = getValueWidth(Math.abs(value[0]));
+ constructor.add(bits, 8);
+ // Encode min{|v|}
+ long min = Math.abs(value[value.length - 1]);
+ constructor.add(min, bits);
+ // Encode all values
+ for (int i = 0; i < value.length; i++) {
+ constructor.add(value[i] >= 0 ? 0 : 1, 1); // Symbol bit
+ value[i] = Math.abs(value[i]) - min;
+ constructor.add(value[i], bits);
+ bits = getValueWidth(value[i]);
+ }
+ }
+
+ /**
+ * Get the valid bit width of x
+ *
+ * @param x
+ * @return valid bit width
+ */
+ private int getValueWidth(long x) {
+ return 64 - Long.numberOfLeadingZeros(x);
+ }
+
+ private int initBeta(double sum2) {
+ double temp = Math.sqrt(threshold * sum2 / (writeIndex * writeIndex));
+ return (int) Math.max(max2Power(temp), Math.log(sum2) / (2 * Math.log(2))
- 60);
+ }
+
+ /**
+ * Returns the exponent of the largest power of 2 that is less than or equal
to x.<br>
+ * max{y|2^y &le x, y is an integer}
+ *
+ * @param x
+ * @return the exponent of the largest power of 2 that is less than or equal
to x
+ */
+ private int max2Power(double x) {
+ double ans = 1;
+ int exponent = 0;
+ if (x > 1) {
+ while (ans * 2 <= x) {
+ ans = ans * 2;
+ exponent++;
+ }
+ } else {
+ while (ans > x) {
+ ans = ans / 2;
+ exponent--;
+ }
+ }
+ return exponent;
+ }
+
+ private ArrayList<Point> selectPoints(double a[]) {
+ // Keep the components with priority queue in the descending order of
energy
+ double sum2 = 0;
+ Point point;
+ PriorityQueue<Point> queue = new PriorityQueue<>(writeIndex);
+ for (int i = 0; i < writeIndex; i++) {
+ point = new Point(i, a[i]);
+ queue.add(point);
+ sum2 += point.getPower();
+ }
+ // Add components to keepList
+ this.beta = initBeta(sum2);
+ double systemError = sum2;
+ ArrayList<Point> keepList = new ArrayList<>();
+ int m = 0; // Number of reserved components
+ double roundingError = 0;
+ double reduceBits = Double.MAX_VALUE;
+ boolean first = true;
+ do {
+ while (systemError + roundingError > threshold * sum2) {
+ point = queue.poll();
+ if (point == null) {
+ systemError = 0;
+ break;
+ }
+ keepList.add(point);
+ systemError = systemError - point.getPower();
+ roundingError = Math.pow(2, this.beta * 2) * keepList.size();
+ }
+ double increaseBits = estimateIncreaseBits(keepList, m);
+ if (reduceBits <= increaseBits || systemError + roundingError >
threshold * sum2) {
+ if (!first) {
+ keepList = new ArrayList(keepList.subList(0, m));
+ this.beta--;
+ }
+ break;
+ }
+ // Increase quantification level
+ first = false;
+ m = keepList.size();
+ reduceBits = m;
+ this.beta++;
+ roundingError = Math.pow(2, this.beta * 2) * m;
+ } while (true);
+ return keepList;
+ }
+
+ /**
+ * Estimate the number of increased bits by reserving more components
+ *
+ * @param list The list of reserved components in this turn
+ * @param m The number of resereved components in last turn
+ * @return Estimated number of bits
+ */
+ private double estimateIncreaseBits(ArrayList<Point> list, int m) {
+ double bits = 0;
+ double eps = Math.pow(2, beta);
+ for (int i = m; i < list.size(); i++) {
+ bits += getValueWidth(writeIndex - 1); // Index
+ bits += getValueWidth(Math.round(Math.abs(list.get(i).getValue()) /
eps)); // Value
+ bits += 1; // Symbol
+ }
+ return bits;
+ }
+
+ protected class Point implements Comparable<Point> {
+
+ private final int index;
+ private final double value;
+
+ public Point(int index, double value) {
+ this.index = index;
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(Point o) {
+ return Double.compare(o.getPower(), this.getPower());
+ }
+
+ /**
+ * @return the index
+ */
+ public int getIndex() {
+ return index;
+ }
+
+ /**
+ * @return the value
+ */
+ public double getValue() {
+ return value;
+ }
+
+ public double getPower() {
+ return value * value;
+ }
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
index 68c7e56b..5ac6dd6a 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
@@ -430,4 +430,66 @@ public abstract class TSEncodingBuilder {
// allowed do nothing
}
}
+
+ /** for INT32, INT64, FLOAT, DOUBLE. */
+ public static class Freq extends TSEncodingBuilder {
+
+ private double snr =
TSFileDescriptor.getInstance().getConfig().getFreqEncodingSNR();
+ private int blockSize =
TSFileDescriptor.getInstance().getConfig().getFreqEncodingBlockSize();
+
+ @Override
+ public Encoder getEncoder(TSDataType type) {
+ switch (type) {
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
+ return new FreqEncoder(blockSize, snr);
+ default:
+ throw new UnSupportedDataTypeException("FREQ doesn't support data
type: " + type);
+ }
+ }
+
+ @Override
+ public void initFromProps(Map<String, String> props) {
+ // set SNR from initialized map or default value if not set
+ if (props == null || !props.containsKey(FreqEncoder.FREQ_ENCODING_SNR)) {
+ snr = TSFileDescriptor.getInstance().getConfig().getFreqEncodingSNR();
+ } else {
+ try {
+ snr = Double.parseDouble(props.get(FreqEncoder.FREQ_ENCODING_SNR));
+ } catch (NumberFormatException e) {
+ logger.warn(
+ "The format of FREQ encoding SNR {} is not correct."
+ + " Using default FREQ encoding SNR.",
+ props.get(FreqEncoder.FREQ_ENCODING_SNR));
+ }
+ if (snr < 0) {
+ snr =
TSFileDescriptor.getInstance().getConfig().getFreqEncodingSNR();
+ logger.warn(
+ "cannot set FREQ encoding SNR to negative value, replaced with
default value:{}",
+ snr);
+ }
+ }
+ // set block size from initialized map or default value if not set
+ if (props == null ||
!props.containsKey(FreqEncoder.FREQ_ENCODING_BLOCK_SIZE)) {
+ blockSize =
TSFileDescriptor.getInstance().getConfig().getFreqEncodingBlockSize();
+ } else {
+ try {
+ blockSize =
Integer.parseInt(props.get(FreqEncoder.FREQ_ENCODING_BLOCK_SIZE));
+ } catch (NumberFormatException e) {
+ logger.warn(
+ "The format of FREQ encoding block size {} is not correct."
+ + " Using default FREQ encoding block size.",
+ props.get(FreqEncoder.FREQ_ENCODING_BLOCK_SIZE));
+ }
+ if (blockSize < 0) {
+ blockSize =
TSFileDescriptor.getInstance().getConfig().getFreqEncodingBlockSize();
+ logger.warn(
+ "cannot set FREQ encoding block size to negative value, replaced
with default value:{}",
+ blockSize);
+ }
+ }
+ }
+ }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/utils/BitConstructor.java
b/java/tsfile/src/main/java/org/apache/tsfile/utils/BitConstructor.java
new file mode 100644
index 00000000..81ac1f2c
--- /dev/null
+++ b/java/tsfile/src/main/java/org/apache/tsfile/utils/BitConstructor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tsfile.utils;
+
+public class BitConstructor {
+
+ private static final int BITS_IN_A_BYTE = 8;
+ private static final long ALL_MASK = -1;
+ private final ByteArrayList data;
+ private byte cache = 0;
+ private int cnt = 0;
+
+ public BitConstructor() {
+ this.data = new ByteArrayList();
+ }
+
+ public BitConstructor(int initialCapacity) {
+ this.data = new ByteArrayList(initialCapacity);
+ }
+
+ public void add(long x, int len) {
+ x = x & ~(ALL_MASK << len); // Make sure that all bits expect the lowest
len bits of x are 0
+ while (len > 0) {
+ // Number of bits inserted into cache
+ int m = len + cnt >= BITS_IN_A_BYTE ? BITS_IN_A_BYTE - cnt : len;
+ len -= m;
+ cnt += m;
+ byte y = (byte) (x >> len);
+ y = (byte) (y << (BITS_IN_A_BYTE - cnt));
+ cache = (byte) (cache | y);
+ x = x & ~(ALL_MASK << len);
+ if (cnt == BITS_IN_A_BYTE) {
+ pad();
+ }
+ }
+ }
+
+ public byte[] toByteArray() {
+ byte[] ret;
+ if (cnt > 0) {
+ data.add(cache);
+ ret = data.toArray();
+ data.removeAtIndex(data.size() - 1);
+ } else {
+ ret = data.toArray();
+ }
+ return ret;
+ }
+
+ public void clear() {
+ data.clear();
+ cache = 0x00;
+ cnt = 0;
+ }
+
+ /** Fill the rest part of cache with 0 */
+ public void pad() {
+ if (cnt > 0) {
+ data.add(cache);
+ cache = 0x00;
+ cnt = 0;
+ }
+ }
+
+ public void add(byte[] bytes) {
+ if (cnt == 0) {
+ data.addAll(bytes);
+ } else {
+ for (byte aByte : bytes) {
+ add(aByte, 8);
+ }
+ }
+ }
+
+ public int sizeInBytes() {
+ return data.size() + (cnt > 0 ? 1 : 0);
+ }
+}
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/utils/BitReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/utils/BitReader.java
new file mode 100644
index 00000000..e883bda9
--- /dev/null
+++ b/java/tsfile/src/main/java/org/apache/tsfile/utils/BitReader.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2021 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tsfile.utils;
+
+import java.nio.ByteBuffer;
+
+public class BitReader {
+
+ private static final int BITS_IN_A_BYTE = 8;
+ private static final byte MASKS[] = {(byte) 0xff, 0x7f, 0x3f, 0x1f, 0x0f,
0x07, 0x03, 0x01};
+ private final ByteBuffer buffer;
+ private int bitCnt = BITS_IN_A_BYTE;
+ private byte cache = 0;
+
+ public BitReader(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public long next(int len) {
+ long ret = 0;
+ while (len > 0) {
+ if (bitCnt == BITS_IN_A_BYTE) {
+ next();
+ }
+ // Number of bits read from the current byte
+ int m = len + bitCnt >= BITS_IN_A_BYTE ? BITS_IN_A_BYTE - bitCnt : len;
+ len -= m;
+ ret = ret << m;
+ byte y = (byte) (cache & MASKS[bitCnt]); // Truncate the low bits with &
+ y = (byte) ((y & 0xff) >>> (BITS_IN_A_BYTE - bitCnt - m)); // Logical
shift right
+ ret = ret | (y & 0xff);
+ bitCnt += m;
+ }
+ return ret;
+ }
+
+ public byte[] nextBytes(int len) {
+ byte[] ret = new byte[len];
+ if (bitCnt == BITS_IN_A_BYTE) {
+ buffer.get(ret);
+ } else {
+ for (int i = 0; i < len; i++) {
+ ret[i] = (byte) next(8);
+ }
+ }
+ return ret;
+ }
+
+ public void skip() {
+ this.bitCnt = BITS_IN_A_BYTE;
+ }
+
+ private void next() {
+ this.cache = buffer.get();
+ this.bitCnt = 0;
+ }
+}