This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 130967a4db392d5068f1fbab78d113e52f320353 Author: Jark Wu <[email protected]> AuthorDate: Wed Apr 29 11:19:29 2020 +0800 [FLINK-16996][table-common] Add binary implementations of internal data structures This closes #11925 --- .../flink/table/data/binary/BinaryArrayData.java | 559 +++++++++ .../flink/table/data/binary/BinaryFormat.java | 73 ++ .../flink/table/data/binary/BinaryMapData.java | 120 ++ .../table/data/binary/BinaryRawValueData.java | 107 ++ .../flink/table/data/binary/BinaryRowData.java | 438 +++++++ .../flink/table/data/binary/BinarySection.java | 75 ++ .../table/data/binary/BinarySegmentUtils.java | 1198 ++++++++++++++++++++ .../flink/table/data/binary/BinaryStringData.java | 856 ++++++++++++++ .../flink/table/data/binary/LazyBinaryFormat.java | 142 +++ .../flink/table/data/binary/MurmurHashUtils.java | 175 +++ .../flink/table/data/binary/NestedRowData.java | 334 ++++++ .../flink/table/data/binary/StringUtf8Utils.java | 306 +++++ .../flink/table/data/binary/TypedSetters.java | 67 ++ 13 files changed, 4450 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java new file mode 100644 index 0000000..30b94e9 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.binary; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; + +import java.lang.reflect.Array; + +import static org.apache.flink.core.memory.MemoryUtils.UNSAFE; + +/** + * A binary implementation of {@link ArrayData} which is backed by {@link MemorySegment}s. + * + * <p>For fields that hold fixed-length primitive types, such as long, double or int, they are + * stored compacted in bytes, just like the original java array. + * + * <p>The binary layout of {@link BinaryArrayData}: + * + * <pre> + * [size(int)] + [null bits(4-byte word boundaries)] + [values or offset&length] + [variable length part]. + * </pre> + */ +@Internal +public final class BinaryArrayData extends BinarySection implements ArrayData, TypedSetters { + + /** + * Offset for Arrays. + */ + private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + private static final int BOOLEAN_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(boolean[].class); + private static final int SHORT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(short[].class); + private static final int INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class); + private static final int LONG_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(long[].class); + private static final int FLOAT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(float[].class); + private static final int DOUBLE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(double[].class); + + public static int calculateHeaderInBytes(int numFields) { + return 4 + ((numFields + 31) / 32) * 4; + } + + /** + * It store real value when type is primitive. + * It store the length and offset of variable-length part when type is string, map, etc. + */ + public static int calculateFixLengthPartSize(LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + case TINYINT: + return 1; + case SMALLINT: + return 2; + case INTEGER: + case FLOAT: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case INTERVAL_YEAR_MONTH: + return 4; + default: + // long, double is 8 bytes. + // It store the length and offset of variable-length part when type is string, map, etc. + return 8; + } + } + + // The number of elements in this array + private int size; + + /** The position to start storing array elements. */ + private int elementOffset; + + public BinaryArrayData() {} + + private void assertIndexIsValid(int ordinal) { + assert ordinal >= 0 : "ordinal (" + ordinal + ") should >= 0"; + assert ordinal < size : "ordinal (" + ordinal + ") should < " + size; + } + + private int getElementOffset(int ordinal, int elementSize) { + return elementOffset + ordinal * elementSize; + } + + @Override + public int size() { + return size; + } + + @Override + public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) { + // Read the number of elements from the first 4 bytes. + final int size = BinarySegmentUtils.getInt(segments, offset); + assert size >= 0 : "size (" + size + ") should >= 0"; + + this.size = size; + this.segments = segments; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + this.elementOffset = offset + calculateHeaderInBytes(this.size); + } + + @Override + public boolean isNullAt(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.bitGet(segments, offset + 4, pos); + } + + public void setNullAt(int pos) { + assertIndexIsValid(pos); + BinarySegmentUtils.bitSet(segments, offset + 4, pos); + } + + public void setNotNullAt(int pos) { + assertIndexIsValid(pos); + BinarySegmentUtils.bitUnSet(segments, offset + 4, pos); + } + + @Override + public long getLong(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.getLong(segments, getElementOffset(pos, 8)); + } + + public void setLong(int pos, long value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + BinarySegmentUtils.setLong(segments, getElementOffset(pos, 8), value); + } + + public void setNullLong(int pos) { + assertIndexIsValid(pos); + BinarySegmentUtils.bitSet(segments, offset + 4, pos); + BinarySegmentUtils.setLong(segments, getElementOffset(pos, 8), 0L); + } + + @Override + public int getInt(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.getInt(segments, getElementOffset(pos, 4)); + } + + public void setInt(int pos, int value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + BinarySegmentUtils.setInt(segments, getElementOffset(pos, 4), value); + } + + public void setNullInt(int pos) { + assertIndexIsValid(pos); + BinarySegmentUtils.bitSet(segments, offset + 4, pos); + BinarySegmentUtils.setInt(segments, getElementOffset(pos, 4), 0); + } + + @Override + public StringData getString(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getElementOffset(pos, 8); + final long offsetAndSize = BinarySegmentUtils.getLong(segments, fieldOffset); + return BinarySegmentUtils.readStringData( + segments, offset, fieldOffset, offsetAndSize); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + assertIndexIsValid(pos); + if (DecimalData.isCompact(precision)) { + return DecimalData.fromUnscaledLong( + BinarySegmentUtils.getLong(segments, getElementOffset(pos, 8)), + precision, + scale); + } + + int fieldOffset = getElementOffset(pos, 8); + final long offsetAndSize = BinarySegmentUtils.getLong(segments, fieldOffset); + return BinarySegmentUtils.readDecimalData(segments, offset, offsetAndSize, precision, scale); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + assertIndexIsValid(pos); + + if (TimestampData.isCompact(precision)) { + return TimestampData.fromEpochMillis( + BinarySegmentUtils.getLong(segments, getElementOffset(pos, 8))); + } + + int fieldOffset = getElementOffset(pos, 8); + final long offsetAndNanoOfMilli = BinarySegmentUtils.getLong(segments, fieldOffset); + return BinarySegmentUtils.readTimestampData(segments, offset, offsetAndNanoOfMilli); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getElementOffset(pos, 8); + final long offsetAndSize = BinarySegmentUtils.getLong(segments, fieldOffset); + return BinarySegmentUtils.readRawValueData(segments, offset, offsetAndSize); + } + + @Override + public byte[] getBinary(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getElementOffset(pos, 8); + final long offsetAndSize = BinarySegmentUtils.getLong(segments, fieldOffset); + return BinarySegmentUtils.readBinary( + segments, offset, fieldOffset, offsetAndSize); + } + + @Override + public ArrayData getArray(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.readArrayData(segments, offset, getLong(pos)); + } + + @Override + public MapData getMap(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.readMapData(segments, offset, getLong(pos)); + } + + @Override + public RowData getRow(int pos, int numFields) { + assertIndexIsValid(pos); + int fieldOffset = getElementOffset(pos, 8); + final long offsetAndSize = BinarySegmentUtils.getLong(segments, fieldOffset); + return BinarySegmentUtils.readRowData(segments, numFields, offset, offsetAndSize); + } + + @Override + public boolean getBoolean(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.getBoolean(segments, getElementOffset(pos, 1)); + } + + public void setBoolean(int pos, boolean value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + BinarySegmentUtils.setBoolean(segments, getElementOffset(pos, 1), value); + } + + public void setNullBoolean(int pos) { + assertIndexIsValid(pos); + BinarySegmentUtils.bitSet(segments, offset + 4, pos); + BinarySegmentUtils.setBoolean(segments, getElementOffset(pos, 1), false); + } + + @Override + public byte getByte(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.getByte(segments, getElementOffset(pos, 1)); + } + + public void setByte(int pos, byte value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + BinarySegmentUtils.setByte(segments, getElementOffset(pos, 1), value); + } + + public void setNullByte(int pos) { + assertIndexIsValid(pos); + BinarySegmentUtils.bitSet(segments, offset + 4, pos); + BinarySegmentUtils.setByte(segments, getElementOffset(pos, 1), (byte) 0); + } + + @Override + public short getShort(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.getShort(segments, getElementOffset(pos, 2)); + } + + public void setShort(int pos, short value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + BinarySegmentUtils.setShort(segments, getElementOffset(pos, 2), value); + } + + public void setNullShort(int pos) { + assertIndexIsValid(pos); + BinarySegmentUtils.bitSet(segments, offset + 4, pos); + BinarySegmentUtils.setShort(segments, getElementOffset(pos, 2), (short) 0); + } + + @Override + public float getFloat(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.getFloat(segments, getElementOffset(pos, 4)); + } + + public void setFloat(int pos, float value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + BinarySegmentUtils.setFloat(segments, getElementOffset(pos, 4), value); + } + + public void setNullFloat(int pos) { + assertIndexIsValid(pos); + BinarySegmentUtils.bitSet(segments, offset + 4, pos); + BinarySegmentUtils.setFloat(segments, getElementOffset(pos, 4), 0F); + } + + @Override + public double getDouble(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.getDouble(segments, getElementOffset(pos, 8)); + } + + public void setDouble(int pos, double value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + BinarySegmentUtils.setDouble(segments, getElementOffset(pos, 8), value); + } + + public void setNullDouble(int pos) { + assertIndexIsValid(pos); + BinarySegmentUtils.bitSet(segments, offset + 4, pos); + BinarySegmentUtils.setDouble(segments, getElementOffset(pos, 8), 0.0); + } + + public void setDecimal(int pos, DecimalData value, int precision) { + assertIndexIsValid(pos); + + if (DecimalData.isCompact(precision)) { + // compact format + setLong(pos, value.toUnscaledLong()); + } else { + int fieldOffset = getElementOffset(pos, 8); + int cursor = (int) (BinarySegmentUtils.getLong(segments, fieldOffset) >>> 32); + assert cursor > 0 : "invalid cursor " + cursor; + // zero-out the bytes + BinarySegmentUtils.setLong(segments, offset + cursor, 0L); + BinarySegmentUtils.setLong(segments, offset + cursor + 8, 0L); + + if (value == null) { + setNullAt(pos); + // keep the offset for future update + BinarySegmentUtils.setLong(segments, fieldOffset, ((long) cursor) << 32); + } else { + + byte[] bytes = value.toUnscaledBytes(); + assert (bytes.length <= 16); + + // Write the bytes to the variable length portion. + BinarySegmentUtils.copyFromBytes(segments, offset + cursor, bytes, 0, bytes.length); + setLong(pos, ((long) cursor << 32) | ((long) bytes.length)); + } + } + } + + public void setTimestamp(int pos, TimestampData value, int precision) { + assertIndexIsValid(pos); + + if (TimestampData.isCompact(precision)) { + setLong(pos, value.getMillisecond()); + } else { + int fieldOffset = getElementOffset(pos, 8); + int cursor = (int) (BinarySegmentUtils.getLong(segments, fieldOffset) >>> 32); + assert cursor > 0 : "invalid cursor " + cursor; + + if (value == null) { + setNullAt(pos); + // zero-out the bytes + BinarySegmentUtils.setLong(segments, offset + cursor, 0L); + // keep the offset for future update + BinarySegmentUtils.setLong(segments, fieldOffset, ((long) cursor) << 32); + } else { + // write millisecond to the variable length portion. + BinarySegmentUtils.setLong(segments, offset + cursor, value.getMillisecond()); + // write nanoOfMillisecond to the fixed-length portion. + setLong(pos, ((long) cursor << 32) | (long) value.getNanoOfMillisecond()); + } + } + } + + public boolean anyNull() { + for (int i = offset + 4; i < elementOffset; i += 4) { + if (BinarySegmentUtils.getInt(segments, i) != 0) { + return true; + } + } + return false; + } + + private void checkNoNull() { + if (anyNull()) { + throw new RuntimeException("Array can not have null value!"); + } + } + + @Override + public boolean[] toBooleanArray() { + checkNoNull(); + boolean[] values = new boolean[size]; + BinarySegmentUtils.copyToUnsafe( + segments, elementOffset, values, BOOLEAN_ARRAY_OFFSET, size); + return values; + } + + @Override + public byte[] toByteArray() { + checkNoNull(); + byte[] values = new byte[size]; + BinarySegmentUtils.copyToUnsafe( + segments, elementOffset, values, BYTE_ARRAY_BASE_OFFSET, size); + return values; + } + + @Override + public short[] toShortArray() { + checkNoNull(); + short[] values = new short[size]; + BinarySegmentUtils.copyToUnsafe( + segments, elementOffset, values, SHORT_ARRAY_OFFSET, size * 2); + return values; + } + + @Override + public int[] toIntArray() { + checkNoNull(); + int[] values = new int[size]; + BinarySegmentUtils.copyToUnsafe( + segments, elementOffset, values, INT_ARRAY_OFFSET, size * 4); + return values; + } + + @Override + public long[] toLongArray() { + checkNoNull(); + long[] values = new long[size]; + BinarySegmentUtils.copyToUnsafe( + segments, elementOffset, values, LONG_ARRAY_OFFSET, size * 8); + return values; + } + + @Override + public float[] toFloatArray() { + checkNoNull(); + float[] values = new float[size]; + BinarySegmentUtils.copyToUnsafe( + segments, elementOffset, values, FLOAT_ARRAY_OFFSET, size * 4); + return values; + } + + @Override + public double[] toDoubleArray() { + checkNoNull(); + double[] values = new double[size]; + BinarySegmentUtils.copyToUnsafe( + segments, elementOffset, values, DOUBLE_ARRAY_OFFSET, size * 8); + return values; + } + + @SuppressWarnings("unchecked") + public <T> T[] toObjectArray(LogicalType elementType) { + Class<T> elementClass = (Class<T>) LogicalTypeUtils.toInternalConversionClass(elementType); + T[] values = (T[]) Array.newInstance(elementClass, size); + for (int i = 0; i < size; i++) { + if (!isNullAt(i)) { + values[i] = (T) ArrayData.get(this, i, elementType); + } + } + return values; + } + + public BinaryArrayData copy() { + return copy(new BinaryArrayData()); + } + + public BinaryArrayData copy(BinaryArrayData reuse) { + byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes); + reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes); + return reuse; + } + + @Override + public int hashCode() { + return BinarySegmentUtils.hashByWords(segments, offset, sizeInBytes); + } + + // ------------------------------------------------------------------------------------------ + // Construction Utilities + // ------------------------------------------------------------------------------------------ + + public static BinaryArrayData fromPrimitiveArray(boolean[] arr) { + return fromPrimitiveArray(arr, BOOLEAN_ARRAY_OFFSET, arr.length, 1); + } + + public static BinaryArrayData fromPrimitiveArray(byte[] arr) { + return fromPrimitiveArray(arr, BYTE_ARRAY_BASE_OFFSET, arr.length, 1); + } + + public static BinaryArrayData fromPrimitiveArray(short[] arr) { + return fromPrimitiveArray(arr, SHORT_ARRAY_OFFSET, arr.length, 2); + } + + public static BinaryArrayData fromPrimitiveArray(int[] arr) { + return fromPrimitiveArray(arr, INT_ARRAY_OFFSET, arr.length, 4); + } + + public static BinaryArrayData fromPrimitiveArray(long[] arr) { + return fromPrimitiveArray(arr, LONG_ARRAY_OFFSET, arr.length, 8); + } + + public static BinaryArrayData fromPrimitiveArray(float[] arr) { + return fromPrimitiveArray(arr, FLOAT_ARRAY_OFFSET, arr.length, 4); + } + + public static BinaryArrayData fromPrimitiveArray(double[] arr) { + return fromPrimitiveArray(arr, DOUBLE_ARRAY_OFFSET, arr.length, 8); + } + + private static BinaryArrayData fromPrimitiveArray( + Object arr, int offset, int length, int elementSize) { + final long headerInBytes = calculateHeaderInBytes(length); + final long valueRegionInBytes = elementSize * length; + + // must align by 8 bytes + long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8; + if (totalSizeInLongs > Integer.MAX_VALUE / 8) { + throw new UnsupportedOperationException("Cannot convert this array to unsafe format as " + + "it's too big."); + } + long totalSize = totalSizeInLongs * 8; + + final byte[] data = new byte[(int) totalSize]; + + UNSAFE.putInt(data, (long) BYTE_ARRAY_BASE_OFFSET, length); + UNSAFE.copyMemory( + arr, offset, data, BYTE_ARRAY_BASE_OFFSET + headerInBytes, valueRegionInBytes); + + BinaryArrayData result = new BinaryArrayData(); + result.pointTo(MemorySegmentFactory.wrap(data), 0, (int) totalSize); + return result; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryFormat.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryFormat.java new file mode 100644 index 0000000..65affe7 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryFormat.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.binary; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.MemorySegment; + +/** + * Binary format spanning {@link MemorySegment}s. + */ +@Internal +public interface BinaryFormat { + + /** + * It decides whether to put data in FixLenPart or VarLenPart. See more in {@link BinaryRowData}. + * + * <p>If len is less than 8, its binary format is: + * 1-bit mark(1) = 1, 7-bits len, and 7-bytes data. + * Data is stored in fix-length part. + * + * <p>If len is greater or equal to 8, its binary format is: + * 1-bit mark(1) = 0, 31-bits offset to the data, and 4-bytes length of data. + * Data is stored in variable-length part. + */ + int MAX_FIX_PART_DATA_SIZE = 7; + /** + * To get the mark in highest bit of long. + * Form: 10000000 00000000 ... (8 bytes) + * + * <p>This is used to decide whether the data is stored in fixed-length part or variable-length + * part. see {@link #MAX_FIX_PART_DATA_SIZE} for more information. + */ + long HIGHEST_FIRST_BIT = 0x80L << 56; + /** + * To get the 7 bits length in second bit to eighth bit out of a long. + * Form: 01111111 00000000 ... (8 bytes) + * + * <p>This is used to get the length of the data which is stored in this long. + * see {@link #MAX_FIX_PART_DATA_SIZE} for more information. + */ + long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56; + + /** + * Gets the underlying {@link MemorySegment}s this binary format spans. + */ + MemorySegment[] getSegments(); + + /** + * Gets the start offset of this binary data in the {@link MemorySegment}s. + */ + int getOffset(); + + /** + * Gets the size in bytes of this binary data. + */ + int getSizeInBytes(); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryMapData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryMapData.java new file mode 100644 index 0000000..fdad44c --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryMapData.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.binary; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.types.logical.LogicalType; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray]. + * + * <p>{@code BinaryMap} are influenced by Apache Spark UnsafeMapData. + */ +@Internal +public final class BinaryMapData extends BinarySection implements MapData { + + private final BinaryArrayData keys; + private final BinaryArrayData values; + + public BinaryMapData() { + keys = new BinaryArrayData(); + values = new BinaryArrayData(); + } + + public int size() { + return keys.size(); + } + + @Override + public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) { + // Read the numBytes of key array from the first 4 bytes. + final int keyArrayBytes = BinarySegmentUtils.getInt(segments, offset); + assert keyArrayBytes >= 0 : "keyArraySize (" + keyArrayBytes + ") should >= 0"; + final int valueArrayBytes = sizeInBytes - keyArrayBytes - 4; + assert valueArrayBytes >= 0 : "valueArraySize (" + valueArrayBytes + ") should >= 0"; + + keys.pointTo(segments, offset + 4, keyArrayBytes); + values.pointTo(segments, offset + 4 + keyArrayBytes, valueArrayBytes); + + assert keys.size() == values.size(); + + this.segments = segments; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public BinaryArrayData keyArray() { + return keys; + } + + public BinaryArrayData valueArray() { + return values; + } + + public Map<?, ?> toJavaMap(LogicalType keyType, LogicalType valueType) { + Object[] keyArray = keys.toObjectArray(keyType); + Object[] valueArray = values.toObjectArray(valueType); + + Map<Object, Object> map = new HashMap<>(); + for (int i = 0; i < keyArray.length; i++) { + map.put(keyArray[i], valueArray[i]); + } + return map; + } + + public BinaryMapData copy() { + return copy(new BinaryMapData()); + } + + public BinaryMapData copy(BinaryMapData reuse) { + byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes); + reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes); + return reuse; + } + + @Override + public int hashCode() { + return BinarySegmentUtils.hashByWords(segments, offset, sizeInBytes); + } + + // ------------------------------------------------------------------------------------------ + // Construction Utilities + // ------------------------------------------------------------------------------------------ + + public static BinaryMapData valueOf(BinaryArrayData key, BinaryArrayData value) { + checkArgument(key.segments.length == 1 && value.getSegments().length == 1); + byte[] bytes = new byte[4 + key.sizeInBytes + value.sizeInBytes]; + MemorySegment segment = MemorySegmentFactory.wrap(bytes); + segment.putInt(0, key.sizeInBytes); + key.getSegments()[0].copyTo(key.getOffset(), segment, 4, key.sizeInBytes); + value.getSegments()[0].copyTo( + value.getOffset(), segment, 4 + key.sizeInBytes, value.sizeInBytes); + BinaryMapData map = new BinaryMapData(); + map.pointTo(segment, 0, bytes.length); + return map; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java new file mode 100644 index 0000000..ff5921b --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.binary; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; + +/** + * A lazily binary implementation of {@link RawValueData} which is backed by {@link MemorySegment}s + * and generic {@link Object}. + * + * <p>Either {@link MemorySegment}s or {@link Object} must be provided when + * constructing {@link BinaryRawValueData}. The other representation will be materialized when needed. + * + * @param <T> the java type of the raw value. + */ +@Internal +public final class BinaryRawValueData<T> extends LazyBinaryFormat<T> implements RawValueData<T> { + + public BinaryRawValueData(T javaObject) { + super(javaObject); + } + + public BinaryRawValueData(MemorySegment[] segments, int offset, int sizeInBytes) { + super(segments, offset, sizeInBytes); + } + + public BinaryRawValueData(MemorySegment[] segments, int offset, int sizeInBytes, T javaObject) { + super(segments, offset, sizeInBytes, javaObject); + } + + // ------------------------------------------------------------------------------------------ + // Public Interfaces + // ------------------------------------------------------------------------------------------ + + @Override + public T toObject(TypeSerializer<T> serializer) { + if (javaObject == null) { + try { + javaObject = InstantiationUtil.deserializeFromByteArray( + serializer, + toBytes(serializer)); + } catch (IOException e) { + throw new FlinkRuntimeException(e); + } + } + return javaObject; + } + + @Override + public byte[] toBytes(TypeSerializer<T> serializer) { + ensureMaterialized(serializer); + return BinarySegmentUtils.copyToBytes(getSegments(), getOffset(), getSizeInBytes()); + } + + @Override + public boolean equals(Object o) { + throw new UnsupportedOperationException("BinaryRawValueData cannot be compared"); + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException("BinaryRawValueData does not have a hashCode"); + } + + @Override + public String toString() { + return String.format("SqlRawValue{%s}", javaObject == null ? "?" : javaObject); + } + + // ------------------------------------------------------------------------------------ + // Internal methods + // ------------------------------------------------------------------------------------ + + @Override + protected BinarySection materialize(TypeSerializer<T> serializer) { + try { + byte[] bytes = InstantiationUtil.serializeToByteArray(serializer, javaObject); + return new BinarySection(new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 0, bytes.length); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java new file mode 100644 index 0000000..d2ad35c --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.binary; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.types.RowKind; + +import java.nio.ByteOrder; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * An implementation of {@link RowData} which is backed by {@link MemorySegment} instead of Object. + * It can significantly reduce the serialization/deserialization of Java objects. + * + * <p>A Row has two part: Fixed-length part and variable-length part. + * + * <p>Fixed-length part contains 1 byte header and null bit set and field values. Null bit set is + * used for null tracking and is aligned to 8-byte word boundaries. `Field values` holds + * fixed-length primitive types and variable-length values which can be stored in 8 bytes inside. + * If it do not fit the variable-length field, then store the length and offset of variable-length + * part. + * + * <p>Fixed-length part will certainly fall into a MemorySegment, which will speed up the read + * and write of field. During the write phase, if the target memory segment has less space than + * fixed length part size, we will skip the space. So the number of fields in a single Row cannot + * exceed the capacity of a single MemorySegment, if there are too many fields, we suggest that + * user set a bigger pageSize of MemorySegment. + * + * <p>Variable-length part may fall into multiple MemorySegments. + */ +@Internal +public final class BinaryRowData extends BinarySection implements RowData, TypedSetters { + + public static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN); + private static final long FIRST_BYTE_ZERO = LITTLE_ENDIAN ? ~0xFFL : ~(0xFFL << 56L); + public static final int HEADER_SIZE_IN_BITS = 8; + + public static int calculateBitSetWidthInBytes(int arity) { + return ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8; + } + + public static int calculateFixPartSizeInBytes(int arity) { + return calculateBitSetWidthInBytes(arity) + 8 * arity; + } + + /** + * If it is a fixed-length field, we can call this BinaryRowData's setXX method for in-place updates. + * If it is variable-length field, can't use this method, because the underlying data is stored continuously. + */ + public static boolean isInFixedLengthPart(LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case INTERVAL_YEAR_MONTH: + case BIGINT: + case INTERVAL_DAY_TIME: + case FLOAT: + case DOUBLE: + return true; + case DECIMAL: + return DecimalData.isCompact(((DecimalType) type).getPrecision()); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return TimestampData.isCompact(((TimestampType) type).getPrecision()); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return TimestampData.isCompact(((LocalZonedTimestampType) type).getPrecision()); + default: + return false; + } + } + + public static boolean isMutable(LogicalType type) { + return isInFixedLengthPart(type) || type.getTypeRoot() == LogicalTypeRoot.DECIMAL; + } + + private final int arity; + private final int nullBitsSizeInBytes; + + public BinaryRowData(int arity) { + checkArgument(arity >= 0); + this.arity = arity; + this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity); + } + + private int getFieldOffset(int pos) { + return offset + nullBitsSizeInBytes + pos * 8; + } + + private void assertIndexIsValid(int index) { + assert index >= 0 : "index (" + index + ") should >= 0"; + assert index < arity : "index (" + index + ") should < " + arity; + } + + public int getFixedLengthPartSize() { + return nullBitsSizeInBytes + 8 * arity; + } + + @Override + public int getArity() { + return arity; + } + + @Override + public RowKind getRowKind() { + byte kindValue = segments[0].get(offset); + return RowKind.fromByteValue(kindValue); + } + + @Override + public void setRowKind(RowKind kind) { + segments[0].put(offset, kind.toByteValue()); + } + + public void setTotalSize(int sizeInBytes) { + this.sizeInBytes = sizeInBytes; + } + + @Override + public boolean isNullAt(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.bitGet(segments[0], offset, pos + HEADER_SIZE_IN_BITS); + } + + private void setNotNullAt(int i) { + assertIndexIsValid(i); + BinarySegmentUtils.bitUnSet(segments[0], offset, i + HEADER_SIZE_IN_BITS); + } + + @Override + public void setNullAt(int i) { + assertIndexIsValid(i); + BinarySegmentUtils.bitSet(segments[0], offset, i + HEADER_SIZE_IN_BITS); + // We must set the fixed length part zero. + // 1.Only int/long/boolean...(Fix length type) will invoke this setNullAt. + // 2.Set to zero in order to equals and hash operation bytes calculation. + segments[0].putLong(getFieldOffset(i), 0); + } + + @Override + public void setInt(int pos, int value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + segments[0].putInt(getFieldOffset(pos), value); + } + + @Override + public void setLong(int pos, long value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + segments[0].putLong(getFieldOffset(pos), value); + } + + @Override + public void setDouble(int pos, double value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + segments[0].putDouble(getFieldOffset(pos), value); + } + + @Override + public void setDecimal(int pos, DecimalData value, int precision) { + assertIndexIsValid(pos); + + if (DecimalData.isCompact(precision)) { + // compact format + setLong(pos, value.toUnscaledLong()); + } else { + int fieldOffset = getFieldOffset(pos); + int cursor = (int) (segments[0].getLong(fieldOffset) >>> 32); + assert cursor > 0 : "invalid cursor " + cursor; + // zero-out the bytes + BinarySegmentUtils.setLong(segments, offset + cursor, 0L); + BinarySegmentUtils.setLong(segments, offset + cursor + 8, 0L); + + if (value == null) { + setNullAt(pos); + // keep the offset for future update + segments[0].putLong(fieldOffset, ((long) cursor) << 32); + } else { + + byte[] bytes = value.toUnscaledBytes(); + assert bytes.length <= 16; + + // Write the bytes to the variable length portion. + BinarySegmentUtils.copyFromBytes(segments, offset + cursor, bytes, 0, bytes.length); + setLong(pos, ((long) cursor << 32) | ((long) bytes.length)); + } + } + } + + @Override + public void setTimestamp(int pos, TimestampData value, int precision) { + assertIndexIsValid(pos); + + if (TimestampData.isCompact(precision)) { + setLong(pos, value.getMillisecond()); + } else { + int fieldOffset = getFieldOffset(pos); + int cursor = (int) (segments[0].getLong(fieldOffset) >>> 32); + assert cursor > 0 : "invalid cursor " + cursor; + + if (value == null) { + setNullAt(pos); + // zero-out the bytes + BinarySegmentUtils.setLong(segments, offset + cursor, 0L); + // keep the offset for future update + segments[0].putLong(fieldOffset, ((long) cursor) << 32); + } else { + // write millisecond to the variable length portion. + BinarySegmentUtils.setLong(segments, offset + cursor, value.getMillisecond()); + // write nanoOfMillisecond to the fixed-length portion. + setLong(pos, ((long) cursor << 32) | (long) value.getNanoOfMillisecond()); + } + } + } + + @Override + public void setBoolean(int pos, boolean value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + segments[0].putBoolean(getFieldOffset(pos), value); + } + + @Override + public void setShort(int pos, short value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + segments[0].putShort(getFieldOffset(pos), value); + } + + @Override + public void setByte(int pos, byte value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + segments[0].put(getFieldOffset(pos), value); + } + + @Override + public void setFloat(int pos, float value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + segments[0].putFloat(getFieldOffset(pos), value); + } + + @Override + public boolean getBoolean(int pos) { + assertIndexIsValid(pos); + return segments[0].getBoolean(getFieldOffset(pos)); + } + + @Override + public byte getByte(int pos) { + assertIndexIsValid(pos); + return segments[0].get(getFieldOffset(pos)); + } + + @Override + public short getShort(int pos) { + assertIndexIsValid(pos); + return segments[0].getShort(getFieldOffset(pos)); + } + + @Override + public int getInt(int pos) { + assertIndexIsValid(pos); + return segments[0].getInt(getFieldOffset(pos)); + } + + @Override + public long getLong(int pos) { + assertIndexIsValid(pos); + return segments[0].getLong(getFieldOffset(pos)); + } + + @Override + public float getFloat(int pos) { + assertIndexIsValid(pos); + return segments[0].getFloat(getFieldOffset(pos)); + } + + @Override + public double getDouble(int pos) { + assertIndexIsValid(pos); + return segments[0].getDouble(getFieldOffset(pos)); + } + + @Override + public StringData getString(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getFieldOffset(pos); + final long offsetAndLen = segments[0].getLong(fieldOffset); + return BinarySegmentUtils.readStringData(segments, offset, fieldOffset, offsetAndLen); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + assertIndexIsValid(pos); + + if (DecimalData.isCompact(precision)) { + return DecimalData.fromUnscaledLong( + segments[0].getLong(getFieldOffset(pos)), + precision, + scale); + } + + int fieldOffset = getFieldOffset(pos); + final long offsetAndSize = segments[0].getLong(fieldOffset); + return BinarySegmentUtils.readDecimalData(segments, offset, offsetAndSize, precision, scale); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + assertIndexIsValid(pos); + + if (TimestampData.isCompact(precision)) { + return TimestampData.fromEpochMillis(segments[0].getLong(getFieldOffset(pos))); + } + + int fieldOffset = getFieldOffset(pos); + final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset); + return BinarySegmentUtils.readTimestampData(segments, offset, offsetAndNanoOfMilli); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.readRawValueData(segments, offset, getLong(pos)); + } + + @Override + public byte[] getBinary(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getFieldOffset(pos); + final long offsetAndLen = segments[0].getLong(fieldOffset); + return BinarySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndLen); + } + + @Override + public ArrayData getArray(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.readArrayData(segments, offset, getLong(pos)); + } + + @Override + public MapData getMap(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.readMapData(segments, offset, getLong(pos)); + } + + @Override + public RowData getRow(int pos, int numFields) { + assertIndexIsValid(pos); + return BinarySegmentUtils.readRowData(segments, numFields, offset, getLong(pos)); + } + + /** + * The bit is 1 when the field is null. Default is 0. + */ + public boolean anyNull() { + // Skip the header. + if ((segments[0].getLong(0) & FIRST_BYTE_ZERO) != 0) { + return true; + } + for (int i = 8; i < nullBitsSizeInBytes; i += 8) { + if (segments[0].getLong(i) != 0) { + return true; + } + } + return false; + } + + public boolean anyNull(int[] fields) { + for (int field : fields) { + if (isNullAt(field)) { + return true; + } + } + return false; + } + + public BinaryRowData copy() { + return copy(new BinaryRowData(arity)); + } + + public BinaryRowData copy(BinaryRowData reuse) { + return copyInternal(reuse); + } + + private BinaryRowData copyInternal(BinaryRowData reuse) { + byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes); + reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes); + return reuse; + } + + public void clear() { + segments = null; + offset = 0; + sizeInBytes = 0; + } + + @Override + public int hashCode() { + return BinarySegmentUtils.hashByWords(segments, offset, sizeInBytes); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinarySection.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinarySection.java new file mode 100644 index 0000000..a2312b8 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinarySection.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.binary; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.MemorySegment; + +/** + * A basic implementation of {@link BinaryFormat} which describe a section of memory. + */ +@Internal +public class BinarySection implements BinaryFormat { + + protected MemorySegment[] segments; + protected int offset; + protected int sizeInBytes; + + public BinarySection() {} + + public BinarySection(MemorySegment[] segments, int offset, int sizeInBytes) { + this.segments = segments; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public final void pointTo(MemorySegment segment, int offset, int sizeInBytes) { + pointTo(new MemorySegment[] {segment}, offset, sizeInBytes); + } + + public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) { + this.segments = segments; + this.offset = offset; + this.sizeInBytes = sizeInBytes; + } + + public MemorySegment[] getSegments() { + return segments; + } + + public int getOffset() { + return offset; + } + + public int getSizeInBytes() { + return sizeInBytes; + } + + @Override + public boolean equals(Object o) { + return this == o || o != null && + getClass() == o.getClass() && + sizeInBytes == ((BinarySection) o).sizeInBytes && + BinarySegmentUtils.equals(segments, offset, ((BinarySection) o).segments, ((BinarySection) o).offset, sizeInBytes); + } + + @Override + public int hashCode() { + return BinarySegmentUtils.hash(segments, offset, sizeInBytes); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinarySegmentUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinarySegmentUtils.java new file mode 100644 index 0000000..e777bba --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinarySegmentUtils.java @@ -0,0 +1,1198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.binary; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; + +import java.io.IOException; +import java.nio.ByteOrder; + +import static org.apache.flink.core.memory.MemoryUtils.UNSAFE; +import static org.apache.flink.table.data.binary.BinaryFormat.HIGHEST_FIRST_BIT; +import static org.apache.flink.table.data.binary.BinaryFormat.HIGHEST_SECOND_TO_EIGHTH_BIT; + +/** + * Utilities for binary data segments which heavily uses {@link MemorySegment}. + */ +@Internal +public final class BinarySegmentUtils { + + /** + * Constant that flags the byte order. + */ + public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; + + private static final int ADDRESS_BITS_PER_WORD = 3; + + private static final int BIT_BYTE_INDEX_MASK = 7; + + /** + * SQL execution threads is limited, not too many, so it can bear the overhead of 64K per thread. + */ + private static final int MAX_BYTES_LENGTH = 1024 * 64; + private static final int MAX_CHARS_LENGTH = 1024 * 32; + + private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final ThreadLocal<byte[]> BYTES_LOCAL = new ThreadLocal<>(); + private static final ThreadLocal<char[]> CHARS_LOCAL = new ThreadLocal<>(); + + private BinarySegmentUtils() { + // do not instantiate + } + + /** + * Allocate bytes that is only for temporary usage, it should not be stored in somewhere else. + * Use a {@link ThreadLocal} to reuse bytes to avoid overhead of byte[] new and gc. + * + * <p>If there are methods that can only accept a byte[], instead of a MemorySegment[] + * parameter, we can allocate a reuse bytes and copy the MemorySegment data to byte[], + * then call the method. Such as String deserialization. + */ + public static byte[] allocateReuseBytes(int length) { + byte[] bytes = BYTES_LOCAL.get(); + + if (bytes == null) { + if (length <= MAX_BYTES_LENGTH) { + bytes = new byte[MAX_BYTES_LENGTH]; + BYTES_LOCAL.set(bytes); + } else { + bytes = new byte[length]; + } + } else if (bytes.length < length) { + bytes = new byte[length]; + } + + return bytes; + } + + public static char[] allocateReuseChars(int length) { + char[] chars = CHARS_LOCAL.get(); + + if (chars == null) { + if (length <= MAX_CHARS_LENGTH) { + chars = new char[MAX_CHARS_LENGTH]; + CHARS_LOCAL.set(chars); + } else { + chars = new char[length]; + } + } else if (chars.length < length) { + chars = new char[length]; + } + + return chars; + } + + /** + * Copy segments to a new byte[]. + * + * @param segments Source segments. + * @param offset Source segments offset. + * @param numBytes the number bytes to copy. + */ + public static byte[] copyToBytes(MemorySegment[] segments, int offset, int numBytes) { + return copyToBytes(segments, offset, new byte[numBytes], 0, numBytes); + } + + /** + * Copy segments to target byte[]. + * + * @param segments Source segments. + * @param offset Source segments offset. + * @param bytes target byte[]. + * @param bytesOffset target byte[] offset. + * @param numBytes the number bytes to copy. + */ + public static byte[] copyToBytes( + MemorySegment[] segments, + int offset, + byte[] bytes, + int bytesOffset, + int numBytes) { + if (inFirstSegment(segments, offset, numBytes)) { + segments[0].get(offset, bytes, bytesOffset, numBytes); + } else { + copyMultiSegmentsToBytes(segments, offset, bytes, bytesOffset, numBytes); + } + return bytes; + } + + public static void copyMultiSegmentsToBytes( + MemorySegment[] segments, + int offset, + byte[] bytes, + int bytesOffset, + int numBytes) { + int remainSize = numBytes; + for (MemorySegment segment : segments) { + int remain = segment.size() - offset; + if (remain > 0) { + int nCopy = Math.min(remain, remainSize); + segment.get(offset, bytes, numBytes - remainSize + bytesOffset, nCopy); + remainSize -= nCopy; + // next new segment. + offset = 0; + if (remainSize == 0) { + return; + } + } else { + // remain is negative, let's advance to next segment + // now the offset = offset - segmentSize (-remain) + offset = -remain; + } + } + } + + /** + * Copy segments to target unsafe pointer. + * + * @param segments Source segments. + * @param offset The position where the bytes are started to be read from these memory + * segments. + * @param target The unsafe memory to copy the bytes to. + * @param pointer The position in the target unsafe memory to copy the chunk to. + * @param numBytes the number bytes to copy. + */ + public static void copyToUnsafe( + MemorySegment[] segments, + int offset, + Object target, + int pointer, + int numBytes) { + if (inFirstSegment(segments, offset, numBytes)) { + segments[0].copyToUnsafe(offset, target, pointer, numBytes); + } else { + copyMultiSegmentsToUnsafe(segments, offset, target, pointer, numBytes); + } + } + + private static void copyMultiSegmentsToUnsafe( + MemorySegment[] segments, + int offset, + Object target, + int pointer, + int numBytes) { + int remainSize = numBytes; + for (MemorySegment segment : segments) { + int remain = segment.size() - offset; + if (remain > 0) { + int nCopy = Math.min(remain, remainSize); + segment.copyToUnsafe(offset, target, numBytes - remainSize + pointer, nCopy); + remainSize -= nCopy; + // next new segment. + offset = 0; + if (remainSize == 0) { + return; + } + } else { + // remain is negative, let's advance to next segment + // now the offset = offset - segmentSize (-remain) + offset = -remain; + } + } + } + + /** + * Copy bytes of segments to output view. + * + * <p>Note: It just copies the data in, not include the length. + * + * @param segments source segments + * @param offset offset for segments + * @param sizeInBytes size in bytes + * @param target target output view + */ + public static void copyToView( + MemorySegment[] segments, + int offset, + int sizeInBytes, + DataOutputView target) throws IOException { + for (MemorySegment sourceSegment : segments) { + int curSegRemain = sourceSegment.size() - offset; + if (curSegRemain > 0) { + int copySize = Math.min(curSegRemain, sizeInBytes); + + byte[] bytes = allocateReuseBytes(copySize); + sourceSegment.get(offset, bytes, 0, copySize); + target.write(bytes, 0, copySize); + + sizeInBytes -= copySize; + offset = 0; + } else { + offset -= sourceSegment.size(); + } + + if (sizeInBytes == 0) { + return; + } + } + + if (sizeInBytes != 0) { + throw new RuntimeException("No copy finished, this should be a bug, " + + "The remaining length is: " + sizeInBytes); + } + } + + /** + * Copy target segments from source byte[]. + * + * @param segments target segments. + * @param offset target segments offset. + * @param bytes source byte[]. + * @param bytesOffset source byte[] offset. + * @param numBytes the number bytes to copy. + */ + public static void copyFromBytes( + MemorySegment[] segments, + int offset, + byte[] bytes, + int bytesOffset, + int numBytes) { + if (segments.length == 1) { + segments[0].put(offset, bytes, bytesOffset, numBytes); + } else { + copyMultiSegmentsFromBytes(segments, offset, bytes, bytesOffset, numBytes); + } + } + + private static void copyMultiSegmentsFromBytes( + MemorySegment[] segments, + int offset, + byte[] bytes, + int bytesOffset, + int numBytes) { + int remainSize = numBytes; + for (MemorySegment segment : segments) { + int remain = segment.size() - offset; + if (remain > 0) { + int nCopy = Math.min(remain, remainSize); + segment.put(offset, bytes, numBytes - remainSize + bytesOffset, nCopy); + remainSize -= nCopy; + // next new segment. + offset = 0; + if (remainSize == 0) { + return; + } + } else { + // remain is negative, let's advance to next segment + // now the offset = offset - segmentSize (-remain) + offset = -remain; + } + } + } + + /** + * Maybe not copied, if want copy, please use copyTo. + */ + public static byte[] getBytes(MemorySegment[] segments, int baseOffset, int sizeInBytes) { + // avoid copy if `base` is `byte[]` + if (segments.length == 1) { + byte[] heapMemory = segments[0].getHeapMemory(); + if (baseOffset == 0 + && heapMemory != null + && heapMemory.length == sizeInBytes) { + return heapMemory; + } else { + byte[] bytes = new byte[sizeInBytes]; + segments[0].get(baseOffset, bytes, 0, sizeInBytes); + return bytes; + } + } else { + byte[] bytes = new byte[sizeInBytes]; + copyMultiSegmentsToBytes(segments, baseOffset, bytes, 0, sizeInBytes); + return bytes; + } + } + + /** + * Equals two memory segments regions. + * + * @param segments1 Segments 1 + * @param offset1 Offset of segments1 to start equaling + * @param segments2 Segments 2 + * @param offset2 Offset of segments2 to start equaling + * @param len Length of the equaled memory region + * + * @return true if equal, false otherwise + */ + public static boolean equals( + MemorySegment[] segments1, int offset1, + MemorySegment[] segments2, int offset2, int len) { + if (inFirstSegment(segments1, offset1, len) && inFirstSegment(segments2, offset2, len)) { + return segments1[0].equalTo(segments2[0], offset1, offset2, len); + } else { + return equalsMultiSegments(segments1, offset1, segments2, offset2, len); + } + } + + @VisibleForTesting + static boolean equalsMultiSegments( + MemorySegment[] segments1, int offset1, + MemorySegment[] segments2, int offset2, int len) { + if (len == 0) { + // quick way and avoid segSize is zero. + return true; + } + + int segSize1 = segments1[0].size(); + int segSize2 = segments2[0].size(); + + // find first segIndex and segOffset of segments. + int segIndex1 = offset1 / segSize1; + int segIndex2 = offset2 / segSize2; + int segOffset1 = offset1 - segSize1 * segIndex1; // equal to % + int segOffset2 = offset2 - segSize2 * segIndex2; // equal to % + + while (len > 0) { + int equalLen = Math.min(Math.min(len, segSize1 - segOffset1), segSize2 - segOffset2); + if (!segments1[segIndex1].equalTo(segments2[segIndex2], segOffset1, segOffset2, equalLen)) { + return false; + } + len -= equalLen; + segOffset1 += equalLen; + if (segOffset1 == segSize1) { + segOffset1 = 0; + segIndex1++; + } + segOffset2 += equalLen; + if (segOffset2 == segSize2) { + segOffset2 = 0; + segIndex2++; + } + } + return true; + } + + /** + * hash segments to int, numBytes must be aligned to 4 bytes. + * + * @param segments Source segments. + * @param offset Source segments offset. + * @param numBytes the number bytes to hash. + */ + public static int hashByWords(MemorySegment[] segments, int offset, int numBytes) { + if (inFirstSegment(segments, offset, numBytes)) { + return MurmurHashUtils.hashBytesByWords(segments[0], offset, numBytes); + } else { + return hashMultiSegByWords(segments, offset, numBytes); + } + } + + private static int hashMultiSegByWords(MemorySegment[] segments, int offset, int numBytes) { + byte[] bytes = allocateReuseBytes(numBytes); + copyMultiSegmentsToBytes(segments, offset, bytes, 0, numBytes); + return MurmurHashUtils.hashUnsafeBytesByWords(bytes, BYTE_ARRAY_BASE_OFFSET, numBytes); + } + + /** + * hash segments to int. + * + * @param segments Source segments. + * @param offset Source segments offset. + * @param numBytes the number bytes to hash. + */ + public static int hash(MemorySegment[] segments, int offset, int numBytes) { + if (inFirstSegment(segments, offset, numBytes)) { + return MurmurHashUtils.hashBytes(segments[0], offset, numBytes); + } else { + return hashMultiSeg(segments, offset, numBytes); + } + } + + private static int hashMultiSeg(MemorySegment[] segments, int offset, int numBytes) { + byte[] bytes = allocateReuseBytes(numBytes); + copyMultiSegmentsToBytes(segments, offset, bytes, 0, numBytes); + return MurmurHashUtils.hashUnsafeBytes(bytes, BYTE_ARRAY_BASE_OFFSET, numBytes); + } + + /** + * Is it just in first MemorySegment, we use quick way to do something. + */ + private static boolean inFirstSegment(MemorySegment[] segments, int offset, int numBytes) { + return numBytes + offset <= segments[0].size(); + } + + /** + * Given a bit index, return the byte index containing it. + * @param bitIndex the bit index. + * @return the byte index. + */ + private static int byteIndex(int bitIndex) { + return bitIndex >>> ADDRESS_BITS_PER_WORD; + } + + /** + * unset bit. + * + * @param segment target segment. + * @param baseOffset bits base offset. + * @param index bit index from base offset. + */ + public static void bitUnSet(MemorySegment segment, int baseOffset, int index) { + int offset = baseOffset + byteIndex(index); + byte current = segment.get(offset); + current &= ~(1 << (index & BIT_BYTE_INDEX_MASK)); + segment.put(offset, current); + } + + /** + * set bit. + * + * @param segment target segment. + * @param baseOffset bits base offset. + * @param index bit index from base offset. + */ + public static void bitSet(MemorySegment segment, int baseOffset, int index) { + int offset = baseOffset + byteIndex(index); + byte current = segment.get(offset); + current |= (1 << (index & BIT_BYTE_INDEX_MASK)); + segment.put(offset, current); + } + + /** + * read bit. + * + * @param segment target segment. + * @param baseOffset bits base offset. + * @param index bit index from base offset. + */ + public static boolean bitGet(MemorySegment segment, int baseOffset, int index) { + int offset = baseOffset + byteIndex(index); + byte current = segment.get(offset); + return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0; + } + + /** + * unset bit from segments. + * + * @param segments target segments. + * @param baseOffset bits base offset. + * @param index bit index from base offset. + */ + public static void bitUnSet(MemorySegment[] segments, int baseOffset, int index) { + if (segments.length == 1) { + MemorySegment segment = segments[0]; + int offset = baseOffset + byteIndex(index); + byte current = segment.get(offset); + current &= ~(1 << (index & BIT_BYTE_INDEX_MASK)); + segment.put(offset, current); + } else { + bitUnSetMultiSegments(segments, baseOffset, index); + } + } + + private static void bitUnSetMultiSegments(MemorySegment[] segments, int baseOffset, int index) { + int offset = baseOffset + byteIndex(index); + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + MemorySegment segment = segments[segIndex]; + + byte current = segment.get(segOffset); + current &= ~(1 << (index & BIT_BYTE_INDEX_MASK)); + segment.put(segOffset, current); + } + + /** + * set bit from segments. + * + * @param segments target segments. + * @param baseOffset bits base offset. + * @param index bit index from base offset. + */ + public static void bitSet(MemorySegment[] segments, int baseOffset, int index) { + if (segments.length == 1) { + int offset = baseOffset + byteIndex(index); + MemorySegment segment = segments[0]; + byte current = segment.get(offset); + current |= (1 << (index & BIT_BYTE_INDEX_MASK)); + segment.put(offset, current); + } else { + bitSetMultiSegments(segments, baseOffset, index); + } + } + + private static void bitSetMultiSegments(MemorySegment[] segments, int baseOffset, int index) { + int offset = baseOffset + byteIndex(index); + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + MemorySegment segment = segments[segIndex]; + + byte current = segment.get(segOffset); + current |= (1 << (index & BIT_BYTE_INDEX_MASK)); + segment.put(segOffset, current); + } + + /** + * read bit from segments. + * + * @param segments target segments. + * @param baseOffset bits base offset. + * @param index bit index from base offset. + */ + public static boolean bitGet(MemorySegment[] segments, int baseOffset, int index) { + int offset = baseOffset + byteIndex(index); + byte current = getByte(segments, offset); + return (current & (1 << (index & BIT_BYTE_INDEX_MASK))) != 0; + } + + /** + * get boolean from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static boolean getBoolean(MemorySegment[] segments, int offset) { + if (inFirstSegment(segments, offset, 1)) { + return segments[0].getBoolean(offset); + } else { + return getBooleanMultiSegments(segments, offset); + } + } + + private static boolean getBooleanMultiSegments(MemorySegment[] segments, int offset) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + return segments[segIndex].getBoolean(segOffset); + } + + /** + * set boolean from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static void setBoolean(MemorySegment[] segments, int offset, boolean value) { + if (inFirstSegment(segments, offset, 1)) { + segments[0].putBoolean(offset, value); + } else { + setBooleanMultiSegments(segments, offset, value); + } + } + + private static void setBooleanMultiSegments(MemorySegment[] segments, int offset, boolean value) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + segments[segIndex].putBoolean(segOffset, value); + } + + /** + * get byte from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static byte getByte(MemorySegment[] segments, int offset) { + if (inFirstSegment(segments, offset, 1)) { + return segments[0].get(offset); + } else { + return getByteMultiSegments(segments, offset); + } + } + + private static byte getByteMultiSegments(MemorySegment[] segments, int offset) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + return segments[segIndex].get(segOffset); + } + + /** + * set byte from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static void setByte(MemorySegment[] segments, int offset, byte value) { + if (inFirstSegment(segments, offset, 1)) { + segments[0].put(offset, value); + } else { + setByteMultiSegments(segments, offset, value); + } + } + + private static void setByteMultiSegments(MemorySegment[] segments, int offset, byte value) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + segments[segIndex].put(segOffset, value); + } + + /** + * get int from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static int getInt(MemorySegment[] segments, int offset) { + if (inFirstSegment(segments, offset, 4)) { + return segments[0].getInt(offset); + } else { + return getIntMultiSegments(segments, offset); + } + } + + private static int getIntMultiSegments(MemorySegment[] segments, int offset) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 3) { + return segments[segIndex].getInt(segOffset); + } else { + return getIntSlowly(segments, segSize, segIndex, segOffset); + } + } + + private static int getIntSlowly( + MemorySegment[] segments, int segSize, int segNum, int segOffset) { + MemorySegment segment = segments[segNum]; + int ret = 0; + for (int i = 0; i < 4; i++) { + if (segOffset == segSize) { + segment = segments[++segNum]; + segOffset = 0; + } + int unsignedByte = segment.get(segOffset) & 0xff; + if (LITTLE_ENDIAN) { + ret |= (unsignedByte << (i * 8)); + } else { + ret |= (unsignedByte << ((3 - i) * 8)); + } + segOffset++; + } + return ret; + } + + /** + * set int from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static void setInt(MemorySegment[] segments, int offset, int value) { + if (inFirstSegment(segments, offset, 4)) { + segments[0].putInt(offset, value); + } else { + setIntMultiSegments(segments, offset, value); + } + } + + private static void setIntMultiSegments(MemorySegment[] segments, int offset, int value) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 3) { + segments[segIndex].putInt(segOffset, value); + } else { + setIntSlowly(segments, segSize, segIndex, segOffset, value); + } + } + + private static void setIntSlowly( + MemorySegment[] segments, int segSize, int segNum, int segOffset, int value) { + MemorySegment segment = segments[segNum]; + for (int i = 0; i < 4; i++) { + if (segOffset == segSize) { + segment = segments[++segNum]; + segOffset = 0; + } + int unsignedByte; + if (LITTLE_ENDIAN) { + unsignedByte = value >> (i * 8); + } else { + unsignedByte = value >> ((3 - i) * 8); + } + segment.put(segOffset, (byte) unsignedByte); + segOffset++; + } + } + + /** + * get long from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static long getLong(MemorySegment[] segments, int offset) { + if (inFirstSegment(segments, offset, 8)) { + return segments[0].getLong(offset); + } else { + return getLongMultiSegments(segments, offset); + } + } + + private static long getLongMultiSegments(MemorySegment[] segments, int offset) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 7) { + return segments[segIndex].getLong(segOffset); + } else { + return getLongSlowly(segments, segSize, segIndex, segOffset); + } + } + + private static long getLongSlowly( + MemorySegment[] segments, int segSize, int segNum, int segOffset) { + MemorySegment segment = segments[segNum]; + long ret = 0; + for (int i = 0; i < 8; i++) { + if (segOffset == segSize) { + segment = segments[++segNum]; + segOffset = 0; + } + long unsignedByte = segment.get(segOffset) & 0xff; + if (LITTLE_ENDIAN) { + ret |= (unsignedByte << (i * 8)); + } else { + ret |= (unsignedByte << ((7 - i) * 8)); + } + segOffset++; + } + return ret; + } + + /** + * set long from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static void setLong(MemorySegment[] segments, int offset, long value) { + if (inFirstSegment(segments, offset, 8)) { + segments[0].putLong(offset, value); + } else { + setLongMultiSegments(segments, offset, value); + } + } + + private static void setLongMultiSegments(MemorySegment[] segments, int offset, long value) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 7) { + segments[segIndex].putLong(segOffset, value); + } else { + setLongSlowly(segments, segSize, segIndex, segOffset, value); + } + } + + private static void setLongSlowly( + MemorySegment[] segments, int segSize, int segNum, int segOffset, long value) { + MemorySegment segment = segments[segNum]; + for (int i = 0; i < 8; i++) { + if (segOffset == segSize) { + segment = segments[++segNum]; + segOffset = 0; + } + long unsignedByte; + if (LITTLE_ENDIAN) { + unsignedByte = value >> (i * 8); + } else { + unsignedByte = value >> ((7 - i) * 8); + } + segment.put(segOffset, (byte) unsignedByte); + segOffset++; + } + } + + /** + * get short from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static short getShort(MemorySegment[] segments, int offset) { + if (inFirstSegment(segments, offset, 2)) { + return segments[0].getShort(offset); + } else { + return getShortMultiSegments(segments, offset); + } + } + + private static short getShortMultiSegments(MemorySegment[] segments, int offset) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 1) { + return segments[segIndex].getShort(segOffset); + } else { + return (short) getTwoByteSlowly(segments, segSize, segIndex, segOffset); + } + } + + /** + * set short from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static void setShort(MemorySegment[] segments, int offset, short value) { + if (inFirstSegment(segments, offset, 2)) { + segments[0].putShort(offset, value); + } else { + setShortMultiSegments(segments, offset, value); + } + } + + private static void setShortMultiSegments(MemorySegment[] segments, int offset, short value) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 1) { + segments[segIndex].putShort(segOffset, value); + } else { + setTwoByteSlowly(segments, segSize, segIndex, segOffset, value, value >> 8); + } + } + + /** + * get float from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static float getFloat(MemorySegment[] segments, int offset) { + if (inFirstSegment(segments, offset, 4)) { + return segments[0].getFloat(offset); + } else { + return getFloatMultiSegments(segments, offset); + } + } + + private static float getFloatMultiSegments(MemorySegment[] segments, int offset) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 3) { + return segments[segIndex].getFloat(segOffset); + } else { + return Float.intBitsToFloat(getIntSlowly(segments, segSize, segIndex, segOffset)); + } + } + + /** + * set float from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static void setFloat(MemorySegment[] segments, int offset, float value) { + if (inFirstSegment(segments, offset, 4)) { + segments[0].putFloat(offset, value); + } else { + setFloatMultiSegments(segments, offset, value); + } + } + + private static void setFloatMultiSegments(MemorySegment[] segments, int offset, float value) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 3) { + segments[segIndex].putFloat(segOffset, value); + } else { + setIntSlowly(segments, segSize, segIndex, segOffset, Float.floatToRawIntBits(value)); + } + } + + /** + * get double from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static double getDouble(MemorySegment[] segments, int offset) { + if (inFirstSegment(segments, offset, 8)) { + return segments[0].getDouble(offset); + } else { + return getDoubleMultiSegments(segments, offset); + } + } + + private static double getDoubleMultiSegments(MemorySegment[] segments, int offset) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 7) { + return segments[segIndex].getDouble(segOffset); + } else { + return Double.longBitsToDouble(getLongSlowly(segments, segSize, segIndex, segOffset)); + } + } + + /** + * set double from segments. + * + * @param segments target segments. + * @param offset value offset. + */ + public static void setDouble(MemorySegment[] segments, int offset, double value) { + if (inFirstSegment(segments, offset, 8)) { + segments[0].putDouble(offset, value); + } else { + setDoubleMultiSegments(segments, offset, value); + } + } + + private static void setDoubleMultiSegments(MemorySegment[] segments, int offset, double value) { + int segSize = segments[0].size(); + int segIndex = offset / segSize; + int segOffset = offset - segIndex * segSize; // equal to % + + if (segOffset < segSize - 7) { + segments[segIndex].putDouble(segOffset, value); + } else { + setLongSlowly(segments, segSize, segIndex, segOffset, Double.doubleToRawLongBits(value)); + } + } + + private static int getTwoByteSlowly( + MemorySegment[] segments, int segSize, int segNum, int segOffset) { + MemorySegment segment = segments[segNum]; + int ret = 0; + for (int i = 0; i < 2; i++) { + if (segOffset == segSize) { + segment = segments[++segNum]; + segOffset = 0; + } + int unsignedByte = segment.get(segOffset) & 0xff; + if (LITTLE_ENDIAN) { + ret |= (unsignedByte << (i * 8)); + } else { + ret |= (unsignedByte << ((1 - i) * 8)); + } + segOffset++; + } + return ret; + } + + private static void setTwoByteSlowly( + MemorySegment[] segments, int segSize, int segNum, int segOffset, int b1, int b2) { + MemorySegment segment = segments[segNum]; + segment.put(segOffset, (byte) (LITTLE_ENDIAN ? b1 : b2)); + segOffset++; + if (segOffset == segSize) { + segment = segments[++segNum]; + segOffset = 0; + } + segment.put(segOffset, (byte) (LITTLE_ENDIAN ? b2 : b1)); + } + + /** + * Gets an instance of {@link DecimalData} from underlying {@link MemorySegment}. + */ + public static DecimalData readDecimalData( + MemorySegment[] segments, + int baseOffset, + long offsetAndSize, + int precision, + int scale) { + final int size = ((int) offsetAndSize); + int subOffset = (int) (offsetAndSize >> 32); + byte[] bytes = new byte[size]; + copyToBytes(segments, baseOffset + subOffset, bytes, 0, size); + return DecimalData.fromUnscaledBytes(bytes, precision, scale); + } + + /** + * Gets an instance of {@link TimestampData} from underlying {@link MemorySegment}. + * + * @param segments the underlying MemorySegments + * @param baseOffset the base offset of current instance of {@code TimestampData} + * @param offsetAndNanos the offset of milli-seconds part and nanoseconds + * @return an instance of {@link TimestampData} + */ + public static TimestampData readTimestampData( + MemorySegment[] segments, int baseOffset, long offsetAndNanos) { + final int nanoOfMillisecond = (int) offsetAndNanos; + final int subOffset = (int) (offsetAndNanos >> 32); + final long millisecond = getLong(segments, baseOffset + subOffset); + return TimestampData.fromEpochMillis(millisecond, nanoOfMillisecond); + } + + /** + * Get binary, if len less than 8, will be include in variablePartOffsetAndLen. + * + * <p>Note: Need to consider the ByteOrder. + * + * @param baseOffset base offset of composite binary format. + * @param fieldOffset absolute start offset of 'variablePartOffsetAndLen'. + * @param variablePartOffsetAndLen a long value, real data or offset and len. + */ + public static byte[] readBinary( + MemorySegment[] segments, + int baseOffset, + int fieldOffset, + long variablePartOffsetAndLen) { + long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT; + if (mark == 0) { + final int subOffset = (int) (variablePartOffsetAndLen >> 32); + final int len = (int) variablePartOffsetAndLen; + return BinarySegmentUtils.copyToBytes(segments, baseOffset + subOffset, len); + } else { + int len = (int) ((variablePartOffsetAndLen & HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56); + if (BinarySegmentUtils.LITTLE_ENDIAN) { + return BinarySegmentUtils.copyToBytes(segments, fieldOffset, len); + } else { + // fieldOffset + 1 to skip header. + return BinarySegmentUtils.copyToBytes(segments, fieldOffset + 1, len); + } + } + } + + /** + * Get binary string, if len less than 8, will be include in variablePartOffsetAndLen. + * + * <p>Note: Need to consider the ByteOrder. + * + * @param baseOffset base offset of composite binary format. + * @param fieldOffset absolute start offset of 'variablePartOffsetAndLen'. + * @param variablePartOffsetAndLen a long value, real data or offset and len. + */ + public static StringData readStringData( + MemorySegment[] segments, + int baseOffset, + int fieldOffset, + long variablePartOffsetAndLen) { + long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT; + if (mark == 0) { + final int subOffset = (int) (variablePartOffsetAndLen >> 32); + final int len = (int) variablePartOffsetAndLen; + return BinaryStringData.fromAddress(segments, baseOffset + subOffset, len); + } else { + int len = (int) ((variablePartOffsetAndLen & HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56); + if (BinarySegmentUtils.LITTLE_ENDIAN) { + return BinaryStringData.fromAddress(segments, fieldOffset, len); + } else { + // fieldOffset + 1 to skip header. + return BinaryStringData.fromAddress(segments, fieldOffset + 1, len); + } + } + } + + /** + * Gets an instance of {@link RawValueData} from underlying {@link MemorySegment}. + */ + public static <T> RawValueData<T> readRawValueData( + MemorySegment[] segments, int baseOffset, long offsetAndSize) { + final int size = ((int) offsetAndSize); + int offset = (int) (offsetAndSize >> 32); + return new BinaryRawValueData<>(segments, offset + baseOffset, size, null); + } + + /** + * Gets an instance of {@link MapData} from underlying {@link MemorySegment}. + */ + public static MapData readMapData( + MemorySegment[] segments, int baseOffset, long offsetAndSize) { + final int size = ((int) offsetAndSize); + int offset = (int) (offsetAndSize >> 32); + BinaryMapData map = new BinaryMapData(); + map.pointTo(segments, offset + baseOffset, size); + return map; + } + + /** + * Gets an instance of {@link ArrayData} from underlying {@link MemorySegment}. + */ + public static ArrayData readArrayData( + MemorySegment[] segments, int baseOffset, long offsetAndSize) { + final int size = ((int) offsetAndSize); + int offset = (int) (offsetAndSize >> 32); + BinaryArrayData array = new BinaryArrayData(); + array.pointTo(segments, offset + baseOffset, size); + return array; + } + + /** + * Gets an instance of {@link RowData} from underlying {@link MemorySegment}. + */ + public static RowData readRowData( + MemorySegment[] segments, int numFields, int baseOffset, long offsetAndSize) { + final int size = ((int) offsetAndSize); + int offset = (int) (offsetAndSize >> 32); + NestedRowData row = new NestedRowData(numFields); + row.pointTo(segments, offset + baseOffset, size); + return row; + } + + /** + * Find equal segments2 in segments1. + * @param segments1 segs to find. + * @param segments2 sub segs. + * @return Return the found offset, return -1 if not find. + */ + public static int find( + MemorySegment[] segments1, int offset1, int numBytes1, + MemorySegment[] segments2, int offset2, int numBytes2) { + if (numBytes2 == 0) { // quick way 1. + return offset1; + } + if (inFirstSegment(segments1, offset1, numBytes1) && + inFirstSegment(segments2, offset2, numBytes2)) { + byte first = segments2[0].get(offset2); + int end = numBytes1 - numBytes2 + offset1; + for (int i = offset1; i <= end; i++) { + // quick way 2: equal first byte. + if (segments1[0].get(i) == first && + segments1[0].equalTo(segments2[0], i, offset2, numBytes2)) { + return i; + } + } + return -1; + } else { + return findInMultiSegments(segments1, offset1, numBytes1, segments2, offset2, numBytes2); + } + } + + private static int findInMultiSegments( + MemorySegment[] segments1, int offset1, int numBytes1, + MemorySegment[] segments2, int offset2, int numBytes2) { + int end = numBytes1 - numBytes2 + offset1; + for (int i = offset1; i <= end; i++) { + if (equalsMultiSegments(segments1, i, segments2, offset2, numBytes2)) { + return i; + } + } + return -1; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java new file mode 100644 index 0000000..5e85554 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java @@ -0,0 +1,856 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.binary; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.data.StringData; + +import javax.annotation.Nonnull; + +import java.util.Arrays; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A lazily binary implementation of {@link StringData} which is backed by {@link MemorySegment}s + * and {@link String}. + * + * <p>Either {@link MemorySegment}s or {@link String} must be provided when + * constructing {@link BinaryStringData}. The other representation will be materialized when needed. + * + * <p>It provides many useful methods for comparison, search, and so on. + */ +@Internal +public final class BinaryStringData extends LazyBinaryFormat<String> implements StringData { + + public static final BinaryStringData EMPTY_UTF8 = BinaryStringData.fromBytes(StringUtf8Utils.encodeUTF8("")); + + public BinaryStringData() {} + + public BinaryStringData(String javaObject) { + super(javaObject); + } + + public BinaryStringData(MemorySegment[] segments, int offset, int sizeInBytes) { + super(segments, offset, sizeInBytes); + } + + public BinaryStringData(MemorySegment[] segments, int offset, int sizeInBytes, String javaObject) { + super(segments, offset, sizeInBytes, javaObject); + } + + // ------------------------------------------------------------------------------------------ + // Construction Utilities + // ------------------------------------------------------------------------------------------ + + /** + * Creates an BinaryStringData from given address (base and offset) and length. + */ + public static BinaryStringData fromAddress( + MemorySegment[] segments, + int offset, + int numBytes) { + return new BinaryStringData(segments, offset, numBytes); + } + + /** + * Creates an BinaryStringData from given java String. + */ + public static BinaryStringData fromString(String str) { + if (str == null) { + return null; + } else { + return new BinaryStringData(str); + } + } + + /** + * Creates an BinaryStringData from given UTF-8 bytes. + */ + public static BinaryStringData fromBytes(byte[] bytes) { + return fromBytes(bytes, 0, bytes.length); + } + + /** + * Creates an BinaryStringData from given UTF-8 bytes with offset and number of bytes. + */ + public static BinaryStringData fromBytes(byte[] bytes, int offset, int numBytes) { + return new BinaryStringData( + new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, + offset, + numBytes); + } + + /** + * Creates an BinaryStringData that contains `length` spaces. + */ + public static BinaryStringData blankString(int length) { + byte[] spaces = new byte[length]; + Arrays.fill(spaces, (byte) ' '); + return fromBytes(spaces); + } + + // ------------------------------------------------------------------------------------------ + // Public Interfaces + // ------------------------------------------------------------------------------------------ + + @Override + public byte[] toBytes() { + ensureMaterialized(); + return BinarySegmentUtils.getBytes( + binarySection.segments, + binarySection.offset, + binarySection.sizeInBytes); + } + + @Override + public boolean equals(Object o) { + if (o instanceof BinaryStringData) { + BinaryStringData other = (BinaryStringData) o; + if (javaObject != null && other.javaObject != null) { + return javaObject.equals(other.javaObject); + } + + ensureMaterialized(); + other.ensureMaterialized(); + return binarySection.equals(other.binarySection); + } else { + return false; + } + } + + @Override + public int hashCode() { + ensureMaterialized(); + return binarySection.hashCode(); + } + + @Override + public String toString() { + if (javaObject == null) { + byte[] bytes = BinarySegmentUtils.allocateReuseBytes(binarySection.sizeInBytes); + BinarySegmentUtils.copyToBytes(binarySection.segments, binarySection.offset, bytes, 0, binarySection.sizeInBytes); + javaObject = StringUtf8Utils.decodeUTF8(bytes, 0, binarySection.sizeInBytes); + } + return javaObject; + } + + /** + * Compares two strings lexicographically. + * Since UTF-8 uses groups of six bits, it is sometimes useful to use octal notation which + * uses 3-bit groups. With a calculator which can convert between hexadecimal and octal it + * can be easier to manually create or interpret UTF-8 compared with using binary. + * So we just compare the binary. + */ + @Override + public int compareTo(@Nonnull StringData o) { + // BinaryStringData is the only implementation of StringData + BinaryStringData other = (BinaryStringData) o; + if (javaObject != null && other.javaObject != null) { + return javaObject.compareTo(other.javaObject); + } + + ensureMaterialized(); + other.ensureMaterialized(); + if (binarySection.segments.length == 1 && other.binarySection.segments.length == 1) { + + int len = Math.min(binarySection.sizeInBytes, other.binarySection.sizeInBytes); + MemorySegment seg1 = binarySection.segments[0]; + MemorySegment seg2 = other.binarySection.segments[0]; + + for (int i = 0; i < len; i++) { + int res = + (seg1.get(binarySection.offset + i) & 0xFF) - (seg2.get(other.binarySection.offset + i) & 0xFF); + if (res != 0) { + return res; + } + } + return binarySection.sizeInBytes - other.binarySection.sizeInBytes; + } + + // if there are multi segments. + return compareMultiSegments(other); + } + + /** + * Find the boundaries of segments, and then compare MemorySegment. + */ + private int compareMultiSegments(BinaryStringData other) { + + if (binarySection.sizeInBytes == 0 || other.binarySection.sizeInBytes == 0) { + return binarySection.sizeInBytes - other.binarySection.sizeInBytes; + } + + int len = Math.min(binarySection.sizeInBytes, other.binarySection.sizeInBytes); + + MemorySegment seg1 = binarySection.segments[0]; + MemorySegment seg2 = other.binarySection.segments[0]; + + int segmentSize = binarySection.segments[0].size(); + int otherSegmentSize = other.binarySection.segments[0].size(); + + int sizeOfFirst1 = segmentSize - binarySection.offset; + int sizeOfFirst2 = otherSegmentSize - other.binarySection.offset; + + int varSegIndex1 = 1; + int varSegIndex2 = 1; + + // find the first segment of this string. + while (sizeOfFirst1 <= 0) { + sizeOfFirst1 += segmentSize; + seg1 = binarySection.segments[varSegIndex1++]; + } + + while (sizeOfFirst2 <= 0) { + sizeOfFirst2 += otherSegmentSize; + seg2 = other.binarySection.segments[varSegIndex2++]; + } + + int offset1 = segmentSize - sizeOfFirst1; + int offset2 = otherSegmentSize - sizeOfFirst2; + + int needCompare = Math.min(Math.min(sizeOfFirst1, sizeOfFirst2), len); + + while (needCompare > 0) { + // compare in one segment. + for (int i = 0; i < needCompare; i++) { + int res = (seg1.get(offset1 + i) & 0xFF) - (seg2.get(offset2 + i) & 0xFF); + if (res != 0) { + return res; + } + } + if (needCompare == len) { + break; + } + len -= needCompare; + // next segment + if (sizeOfFirst1 < sizeOfFirst2) { //I am smaller + seg1 = binarySection.segments[varSegIndex1++]; + offset1 = 0; + offset2 += needCompare; + sizeOfFirst1 = segmentSize; + sizeOfFirst2 -= needCompare; + } else if (sizeOfFirst1 > sizeOfFirst2) { //other is smaller + seg2 = other.binarySection.segments[varSegIndex2++]; + offset2 = 0; + offset1 += needCompare; + sizeOfFirst2 = otherSegmentSize; + sizeOfFirst1 -= needCompare; + } else { // same, should go ahead both. + seg1 = binarySection.segments[varSegIndex1++]; + seg2 = other.binarySection.segments[varSegIndex2++]; + offset1 = 0; + offset2 = 0; + sizeOfFirst1 = segmentSize; + sizeOfFirst2 = otherSegmentSize; + } + needCompare = Math.min(Math.min(sizeOfFirst1, sizeOfFirst2), len); + } + + checkArgument(needCompare == len); + + return binarySection.sizeInBytes - other.binarySection.sizeInBytes; + } + + // ------------------------------------------------------------------------------------------ + // Public methods on BinaryStringData + // ------------------------------------------------------------------------------------------ + + /** + * Returns the number of UTF-8 code points in the string. + */ + public int numChars() { + ensureMaterialized(); + if (inFirstSegment()) { + int len = 0; + for (int i = 0; i < binarySection.sizeInBytes; i += numBytesForFirstByte(getByteOneSegment(i))) { + len++; + } + return len; + } else { + return numCharsMultiSegs(); + } + } + + private int numCharsMultiSegs() { + int len = 0; + int segSize = binarySection.segments[0].size(); + BinaryStringData.SegmentAndOffset index = firstSegmentAndOffset(segSize); + int i = 0; + while (i < binarySection.sizeInBytes) { + int charBytes = numBytesForFirstByte(index.value()); + i += charBytes; + len++; + index.skipBytes(charBytes, segSize); + } + return len; + } + + /** + * Returns the {@code byte} value at the specified index. An index ranges from {@code 0} to + * {@code binarySection.sizeInBytes - 1}. + * + * @param index the index of the {@code byte} value. + * @return the {@code byte} value at the specified index of this UTF-8 bytes. + * @exception IndexOutOfBoundsException if the {@code index} + * argument is negative or not less than the length of this + * UTF-8 bytes. + */ + public byte byteAt(int index) { + ensureMaterialized(); + int globalOffset = binarySection.offset + index; + int size = binarySection.segments[0].size(); + if (globalOffset < size) { + return binarySection.segments[0].get(globalOffset); + } else { + return binarySection.segments[globalOffset / size].get(globalOffset % size); + } + } + + @Override + public MemorySegment[] getSegments() { + ensureMaterialized(); + return super.getSegments(); + } + + @Override + public int getOffset() { + ensureMaterialized(); + return super.getOffset(); + } + + @Override + public int getSizeInBytes() { + ensureMaterialized(); + return super.getSizeInBytes(); + } + + public void ensureMaterialized() { + ensureMaterialized(null); + } + + @Override + protected BinarySection materialize(TypeSerializer<String> serializer) { + if (serializer != null) { + throw new IllegalArgumentException("BinaryStringData does not support custom serializers"); + } + + byte[] bytes = StringUtf8Utils.encodeUTF8(javaObject); + return new BinarySection( + new MemorySegment[]{MemorySegmentFactory.wrap(bytes)}, + 0, + bytes.length + ); + } + + /** + * Copy a new {@code BinaryStringData}. + */ + public BinaryStringData copy() { + ensureMaterialized(); + byte[] copy = BinarySegmentUtils.copyToBytes(binarySection.segments, binarySection.offset, binarySection.sizeInBytes); + return new BinaryStringData(new MemorySegment[] {MemorySegmentFactory.wrap(copy)}, + 0, binarySection.sizeInBytes, javaObject); + } + + /** + * Returns a binary string that is a substring of this binary string. The substring begins at + * the specified {@code beginIndex} and extends to the character at index {@code endIndex - 1}. + * + * <p>Examples: + * <blockquote><pre> + * fromString("hamburger").substring(4, 8) returns binary string "urge" + * fromString("smiles").substring(1, 5) returns binary string "mile" + * </pre></blockquote> + * + * @param beginIndex the beginning index, inclusive. + * @param endIndex the ending index, exclusive. + * @return the specified substring, return EMPTY_UTF8 when index out of bounds + * instead of StringIndexOutOfBoundsException. + */ + public BinaryStringData substring(int beginIndex, int endIndex) { + ensureMaterialized(); + if (endIndex <= beginIndex || beginIndex >= binarySection.sizeInBytes) { + return EMPTY_UTF8; + } + if (inFirstSegment()) { + MemorySegment segment = binarySection.segments[0]; + int i = 0; + int c = 0; + while (i < binarySection.sizeInBytes && c < beginIndex) { + i += numBytesForFirstByte(segment.get(i + binarySection.offset)); + c += 1; + } + + int j = i; + while (i < binarySection.sizeInBytes && c < endIndex) { + i += numBytesForFirstByte(segment.get(i + binarySection.offset)); + c += 1; + } + + if (i > j) { + byte[] bytes = new byte[i - j]; + segment.get(binarySection.offset + j, bytes, 0, i - j); + return fromBytes(bytes); + } else { + return EMPTY_UTF8; + } + } else { + return substringMultiSegs(beginIndex, endIndex); + } + } + + private BinaryStringData substringMultiSegs(final int start, final int until) { + int segSize = binarySection.segments[0].size(); + BinaryStringData.SegmentAndOffset index = firstSegmentAndOffset(segSize); + int i = 0; + int c = 0; + while (i < binarySection.sizeInBytes && c < start) { + int charSize = numBytesForFirstByte(index.value()); + i += charSize; + index.skipBytes(charSize, segSize); + c += 1; + } + + int j = i; + while (i < binarySection.sizeInBytes && c < until) { + int charSize = numBytesForFirstByte(index.value()); + i += charSize; + index.skipBytes(charSize, segSize); + c += 1; + } + + if (i > j) { + return fromBytes(BinarySegmentUtils.copyToBytes(binarySection.segments, binarySection.offset + j, i - j)); + } else { + return EMPTY_UTF8; + } + } + + /** + * Returns true if and only if this BinaryStringData contains the specified + * sequence of bytes values. + * + * @param s the sequence to search for + * @return true if this BinaryStringData contains {@code s}, false otherwise + */ + public boolean contains(final BinaryStringData s) { + ensureMaterialized(); + s.ensureMaterialized(); + if (s.binarySection.sizeInBytes == 0) { + return true; + } + int find = BinarySegmentUtils.find( + binarySection.segments, binarySection.offset, binarySection.sizeInBytes, + s.binarySection.segments, s.binarySection.offset, s.binarySection.sizeInBytes); + return find != -1; + } + + /** + * Tests if this BinaryStringData starts with the specified prefix. + * + * @param prefix the prefix. + * @return {@code true} if the bytes represented by the argument is a prefix of the bytes + * represented by this string; {@code false} otherwise. Note also that {@code true} + * will be returned if the argument is an empty BinaryStringData or is equal to this + * {@code BinaryStringData} object as determined by the {@link #equals(Object)} method. + */ + public boolean startsWith(final BinaryStringData prefix) { + ensureMaterialized(); + prefix.ensureMaterialized(); + return matchAt(prefix, 0); + } + + /** + * Tests if this BinaryStringData ends with the specified suffix. + * + * @param suffix the suffix. + * @return {@code true} if the bytes represented by the argument is a suffix of the bytes + * represented by this object; {@code false} otherwise. Note that the result will + * be {@code true} if the argument is the empty string or is equal to this + * {@code BinaryStringData} object as determined by the {@link #equals(Object)} method. + */ + public boolean endsWith(final BinaryStringData suffix) { + ensureMaterialized(); + suffix.ensureMaterialized(); + return matchAt(suffix, binarySection.sizeInBytes - suffix.binarySection.sizeInBytes); + } + + /** + * Returns a string whose value is this string, with any leading and trailing + * whitespace removed. + * + * @return A string whose value is this string, with any leading and trailing white + * space removed, or this string if it has no leading or + * trailing white space. + */ + public BinaryStringData trim() { + ensureMaterialized(); + if (inFirstSegment()) { + int s = 0; + int e = this.binarySection.sizeInBytes - 1; + // skip all of the space (0x20) in the left side + while (s < this.binarySection.sizeInBytes && getByteOneSegment(s) == 0x20) { + s++; + } + // skip all of the space (0x20) in the right side + while (e >= s && getByteOneSegment(e) == 0x20) { + e--; + } + if (s > e) { + // empty string + return EMPTY_UTF8; + } else { + return copyBinaryStringInOneSeg(s, e - s + 1); + } + } else { + return trimMultiSegs(); + } + } + + private BinaryStringData trimMultiSegs() { + int s = 0; + int e = this.binarySection.sizeInBytes - 1; + int segSize = binarySection.segments[0].size(); + BinaryStringData.SegmentAndOffset front = firstSegmentAndOffset(segSize); + // skip all of the space (0x20) in the left side + while (s < this.binarySection.sizeInBytes && front.value() == 0x20) { + s++; + front.nextByte(segSize); + } + BinaryStringData.SegmentAndOffset behind = lastSegmentAndOffset(segSize); + // skip all of the space (0x20) in the right side + while (e >= s && behind.value() == 0x20) { + e--; + behind.previousByte(segSize); + } + if (s > e) { + // empty string + return EMPTY_UTF8; + } else { + return copyBinaryString(s, e); + } + } + + /** + * Returns the index within this string of the first occurrence of the + * specified substring, starting at the specified index. + * + * @param str the substring to search for. + * @param fromIndex the index from which to start the search. + * @return the index of the first occurrence of the specified substring, + * starting at the specified index, + * or {@code -1} if there is no such occurrence. + */ + public int indexOf(BinaryStringData str, int fromIndex) { + ensureMaterialized(); + str.ensureMaterialized(); + if (str.binarySection.sizeInBytes == 0) { + return 0; + } + if (inFirstSegment()) { + // position in byte + int byteIdx = 0; + // position is char + int charIdx = 0; + while (byteIdx < binarySection.sizeInBytes && charIdx < fromIndex) { + byteIdx += numBytesForFirstByte(getByteOneSegment(byteIdx)); + charIdx++; + } + do { + if (byteIdx + str.binarySection.sizeInBytes > binarySection.sizeInBytes) { + return -1; + } + if (BinarySegmentUtils.equals(binarySection.segments, binarySection.offset + byteIdx, + str.binarySection.segments, str.binarySection.offset, str.binarySection.sizeInBytes)) { + return charIdx; + } + byteIdx += numBytesForFirstByte(getByteOneSegment(byteIdx)); + charIdx++; + } while (byteIdx < binarySection.sizeInBytes); + + return -1; + } else { + return indexOfMultiSegs(str, fromIndex); + } + } + + private int indexOfMultiSegs(BinaryStringData str, int fromIndex) { + // position in byte + int byteIdx = 0; + // position is char + int charIdx = 0; + int segSize = binarySection.segments[0].size(); + BinaryStringData.SegmentAndOffset index = firstSegmentAndOffset(segSize); + while (byteIdx < binarySection.sizeInBytes && charIdx < fromIndex) { + int charBytes = numBytesForFirstByte(index.value()); + byteIdx += charBytes; + charIdx++; + index.skipBytes(charBytes, segSize); + } + do { + if (byteIdx + str.binarySection.sizeInBytes > binarySection.sizeInBytes) { + return -1; + } + if (BinarySegmentUtils.equals(binarySection.segments, binarySection.offset + byteIdx, + str.binarySection.segments, str.binarySection.offset, str.binarySection.sizeInBytes)) { + return charIdx; + } + int charBytes = numBytesForFirstByte(index.segment.get(index.offset)); + byteIdx += charBytes; + charIdx++; + index.skipBytes(charBytes, segSize); + } while (byteIdx < binarySection.sizeInBytes); + + return -1; + } + + /** + * Converts all of the characters in this {@code BinaryStringData} to upper case. + * + * @return the {@code BinaryStringData}, converted to uppercase. + */ + public BinaryStringData toUpperCase() { + if (javaObject != null) { + return javaToUpperCase(); + } + if (binarySection.sizeInBytes == 0) { + return EMPTY_UTF8; + } + int size = binarySection.segments[0].size(); + BinaryStringData.SegmentAndOffset segmentAndOffset = startSegmentAndOffset(size); + byte[] bytes = new byte[binarySection.sizeInBytes]; + bytes[0] = (byte) Character.toTitleCase(segmentAndOffset.value()); + for (int i = 0; i < binarySection.sizeInBytes; i++) { + byte b = segmentAndOffset.value(); + if (numBytesForFirstByte(b) != 1) { + // fallback + return javaToUpperCase(); + } + int upper = Character.toUpperCase((int) b); + if (upper > 127) { + // fallback + return javaToUpperCase(); + } + bytes[i] = (byte) upper; + segmentAndOffset.nextByte(size); + } + return fromBytes(bytes); + } + + private BinaryStringData javaToUpperCase() { + return fromString(toString().toUpperCase()); + } + + /** + * Converts all of the characters in this {@code BinaryStringData} to lower case. + * + * @return the {@code BinaryStringData}, converted to lowercase. + */ + public BinaryStringData toLowerCase() { + if (javaObject != null) { + return javaToLowerCase(); + } + if (binarySection.sizeInBytes == 0) { + return EMPTY_UTF8; + } + int size = binarySection.segments[0].size(); + BinaryStringData.SegmentAndOffset segmentAndOffset = startSegmentAndOffset(size); + byte[] bytes = new byte[binarySection.sizeInBytes]; + bytes[0] = (byte) Character.toTitleCase(segmentAndOffset.value()); + for (int i = 0; i < binarySection.sizeInBytes; i++) { + byte b = segmentAndOffset.value(); + if (numBytesForFirstByte(b) != 1) { + // fallback + return javaToLowerCase(); + } + int lower = Character.toLowerCase((int) b); + if (lower > 127) { + // fallback + return javaToLowerCase(); + } + bytes[i] = (byte) lower; + segmentAndOffset.nextByte(size); + } + return fromBytes(bytes); + } + + private BinaryStringData javaToLowerCase() { + return fromString(toString().toLowerCase()); + } + + // ------------------------------------------------------------------------------------------ + // Internal methods on BinaryStringData + // ------------------------------------------------------------------------------------------ + + byte getByteOneSegment(int i) { + return binarySection.segments[0].get(binarySection.offset + i); + } + + boolean inFirstSegment() { + return binarySection.sizeInBytes + binarySection.offset <= binarySection.segments[0].size(); + } + + private boolean matchAt(final BinaryStringData s, int pos) { + return (inFirstSegment() && s.inFirstSegment()) ? matchAtOneSeg(s, pos) : matchAtVarSeg(s, pos); + } + + private boolean matchAtOneSeg(final BinaryStringData s, int pos) { + return s.binarySection.sizeInBytes + pos <= binarySection.sizeInBytes && pos >= 0 && + binarySection.segments[0].equalTo( + s.binarySection.segments[0], + binarySection.offset + pos, + s.binarySection.offset, + s.binarySection.sizeInBytes); + } + + private boolean matchAtVarSeg(final BinaryStringData s, int pos) { + return s.binarySection.sizeInBytes + pos <= binarySection.sizeInBytes && pos >= 0 && + BinarySegmentUtils.equals( + binarySection.segments, + binarySection.offset + pos, + s.binarySection.segments, + s.binarySection.offset, + s.binarySection.sizeInBytes); + } + + BinaryStringData copyBinaryStringInOneSeg(int start, int len) { + byte[] newBytes = new byte[len]; + binarySection.segments[0].get(binarySection.offset + start, newBytes, 0, len); + return fromBytes(newBytes); + } + + BinaryStringData copyBinaryString(int start, int end) { + int len = end - start + 1; + byte[] newBytes = new byte[len]; + BinarySegmentUtils.copyToBytes(binarySection.segments, binarySection.offset + start, newBytes, 0, len); + return fromBytes(newBytes); + } + + BinaryStringData.SegmentAndOffset firstSegmentAndOffset(int segSize) { + int segIndex = binarySection.offset / segSize; + return new BinaryStringData.SegmentAndOffset(segIndex, binarySection.offset % segSize); + } + + BinaryStringData.SegmentAndOffset lastSegmentAndOffset(int segSize) { + int lastOffset = binarySection.offset + binarySection.sizeInBytes - 1; + int segIndex = lastOffset / segSize; + return new BinaryStringData.SegmentAndOffset(segIndex, lastOffset % segSize); + } + + private BinaryStringData.SegmentAndOffset startSegmentAndOffset(int segSize) { + return inFirstSegment() ? new BinaryStringData.SegmentAndOffset(0, binarySection.offset) : firstSegmentAndOffset(segSize); + } + + /** + * CurrentSegment and positionInSegment. + */ + class SegmentAndOffset { + int segIndex; + MemorySegment segment; + int offset; + + private SegmentAndOffset(int segIndex, int offset) { + this.segIndex = segIndex; + this.segment = binarySection.segments[segIndex]; + this.offset = offset; + } + + private void assignSegment() { + segment = segIndex >= 0 && segIndex < binarySection.segments.length ? + binarySection.segments[segIndex] : null; + } + + void previousByte(int segSize) { + offset--; + if (offset == -1) { + segIndex--; + assignSegment(); + offset = segSize - 1; + } + } + + void nextByte(int segSize) { + offset++; + checkAdvance(segSize); + } + + private void checkAdvance(int segSize) { + if (offset == segSize) { + advance(); + } + } + + private void advance() { + segIndex++; + assignSegment(); + offset = 0; + } + + void skipBytes(int n, int segSize) { + int remaining = segSize - this.offset; + if (remaining > n) { + this.offset += n; + } else { + while (true) { + int toSkip = Math.min(remaining, n); + n -= toSkip; + if (n <= 0) { + this.offset += toSkip; + checkAdvance(segSize); + return; + } + advance(); + remaining = segSize - this.offset; + } + } + } + + byte value() { + return this.segment.get(this.offset); + } + } + + /** + * Returns the number of bytes for a code point with the first byte as `b`. + * @param b The first byte of a code point + */ + static int numBytesForFirstByte(final byte b) { + if (b >= 0) { + // 1 byte, 7 bits: 0xxxxxxx + return 1; + } else if ((b >> 5) == -2 && (b & 0x1e) != 0) { + // 2 bytes, 11 bits: 110xxxxx 10xxxxxx + return 2; + } else if ((b >> 4) == -2) { + // 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx + return 3; + } else if ((b >> 3) == -2) { + // 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx + return 4; + } else { + // Skip the first byte disallowed in UTF-8 + // Handling errors quietly, same semantics to java String. + return 1; + } + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/LazyBinaryFormat.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/LazyBinaryFormat.java new file mode 100644 index 0000000..0b65f81 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/LazyBinaryFormat.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.binary; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.util.WrappingRuntimeException; + +import java.io.IOException; + +/** + * An abstract implementation fo {@link BinaryFormat} which is lazily serialized into binary + * or lazily deserialized into Java object. + * + * <p>The reason why we introduce this data structure is in order to save (de)serialization + * in nested function calls. Consider the following function call chain: + * + * <pre>UDF0(input) -> UDF1(result0) -> UDF2(result1) -> UDF3(result2)</pre> + * + * <p>Such nested calls, if the return values of UDFs are Java object format, + * it will result in multiple conversions between Java object and binary format: + * + * <pre> + * converterToBinary(UDF0(converterToJavaObject(input))) -> + * converterToBinary(UDF1(converterToJavaObject(result0))) -> + * converterToBinary(UDF2(converterToJavaObject(result1))) -> + * ... + * </pre> + * + * <p>So we introduced {@link LazyBinaryFormat} to avoid the redundant cost, it has three forms: + * <ul> + * <li>Binary form</li> + * <li>Java object form</li> + * <li>Binary and Java object both exist</li> + * </ul> + * + * <p>It can lazy the conversions as much as possible. It will be converted into required form + * only when it is needed. + */ +@Internal +public abstract class LazyBinaryFormat<T> implements BinaryFormat { + + T javaObject; + BinarySection binarySection; + + public LazyBinaryFormat() { + this(null, -1, -1, null); + } + + public LazyBinaryFormat(MemorySegment[] segments, int offset, int sizeInBytes, T javaObject) { + this(javaObject, new BinarySection(segments, offset, sizeInBytes)); + } + + public LazyBinaryFormat(MemorySegment[] segments, int offset, int sizeInBytes) { + this(null, new BinarySection(segments, offset, sizeInBytes)); + } + + public LazyBinaryFormat(T javaObject) { + this(javaObject, null); + } + + public LazyBinaryFormat(T javaObject, BinarySection binarySection) { + this.javaObject = javaObject; + this.binarySection = binarySection; + } + + public T getJavaObject() { + return javaObject; + } + + public BinarySection getBinarySection() { + return binarySection; + } + + /** + * Must be public as it is used during code generation. + */ + public void setJavaObject(T javaObject) { + this.javaObject = javaObject; + } + + @Override + public MemorySegment[] getSegments() { + if (binarySection == null) { + throw new IllegalStateException("Lazy Binary Format was not materialized"); + } + return binarySection.segments; + } + + @Override + public int getOffset() { + if (binarySection == null) { + throw new IllegalStateException("Lazy Binary Format was not materialized"); + } + return binarySection.offset; + } + + @Override + public int getSizeInBytes() { + if (binarySection == null) { + throw new IllegalStateException("Lazy Binary Format was not materialized"); + } + return binarySection.sizeInBytes; + } + + /** + * Ensure we have materialized binary format. + */ + public final void ensureMaterialized(TypeSerializer<T> serializer) { + if (binarySection == null) { + try { + this.binarySection = materialize(serializer); + } catch (IOException e) { + throw new WrappingRuntimeException(e); + } + } + } + + /** + * Materialize java object to binary format. + * Inherited classes need to hold the information they need. + * (For example, {@link RawValueData} needs javaObjectSerializer). + */ + protected abstract BinarySection materialize(TypeSerializer<T> serializer) throws IOException; +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/MurmurHashUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/MurmurHashUtils.java new file mode 100644 index 0000000..6635c66 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/MurmurHashUtils.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.binary; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.MemorySegment; + +import static org.apache.flink.core.memory.MemoryUtils.UNSAFE; + +/** + * Murmur Hash. This is inspired by Guava's Murmur3_32HashFunction. + */ +@Internal +final class MurmurHashUtils { + + private static final int C1 = 0xcc9e2d51; + private static final int C2 = 0x1b873593; + public static final int DEFAULT_SEED = 42; + + private MurmurHashUtils() { + // do not instantiate + } + + /** + * Hash unsafe bytes, length must be aligned to 4 bytes. + * @param base base unsafe object + * @param offset offset for unsafe object + * @param lengthInBytes length in bytes + * @return hash code + */ + public static int hashUnsafeBytesByWords(Object base, long offset, int lengthInBytes) { + return hashUnsafeBytesByWords(base, offset, lengthInBytes, DEFAULT_SEED); + } + + /** + * Hash unsafe bytes. + * @param base base unsafe object + * @param offset offset for unsafe object + * @param lengthInBytes length in bytes + * @return hash code + */ + public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) { + return hashUnsafeBytes(base, offset, lengthInBytes, DEFAULT_SEED); + } + + /** + * Hash bytes in MemorySegment, length must be aligned to 4 bytes. + * @param segment segment. + * @param offset offset for MemorySegment + * @param lengthInBytes length in MemorySegment + * @return hash code + */ + public static int hashBytesByWords(MemorySegment segment, int offset, int lengthInBytes) { + return hashBytesByWords(segment, offset, lengthInBytes, DEFAULT_SEED); + } + + /** + * Hash bytes in MemorySegment. + * @param segment segment. + * @param offset offset for MemorySegment + * @param lengthInBytes length in MemorySegment + * @return hash code + */ + public static int hashBytes(MemorySegment segment, int offset, int lengthInBytes) { + return hashBytes(segment, offset, lengthInBytes, DEFAULT_SEED); + } + + private static int hashUnsafeBytesByWords(Object base, long offset, int lengthInBytes, int seed) { + int h1 = hashUnsafeBytesByInt(base, offset, lengthInBytes, seed); + return fmix(h1, lengthInBytes); + } + + private static int hashBytesByWords(MemorySegment segment, int offset, int lengthInBytes, int seed) { + int h1 = hashBytesByInt(segment, offset, lengthInBytes, seed); + return fmix(h1, lengthInBytes); + } + + private static int hashBytes(MemorySegment segment, int offset, int lengthInBytes, int seed) { + int lengthAligned = lengthInBytes - lengthInBytes % 4; + int h1 = hashBytesByInt(segment, offset, lengthAligned, seed); + for (int i = lengthAligned; i < lengthInBytes; i++) { + int k1 = mixK1(segment.get(offset + i)); + h1 = mixH1(h1, k1); + } + return fmix(h1, lengthInBytes); + } + + private static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) { + assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative"; + int lengthAligned = lengthInBytes - lengthInBytes % 4; + int h1 = hashUnsafeBytesByInt(base, offset, lengthAligned, seed); + for (int i = lengthAligned; i < lengthInBytes; i++) { + int halfWord = UNSAFE.getByte(base, offset + i); + int k1 = mixK1(halfWord); + h1 = mixH1(h1, k1); + } + return fmix(h1, lengthInBytes); + } + + private static int hashUnsafeBytesByInt(Object base, long offset, int lengthInBytes, int seed) { + assert (lengthInBytes % 4 == 0); + int h1 = seed; + for (int i = 0; i < lengthInBytes; i += 4) { + int halfWord = UNSAFE.getInt(base, offset + i); + int k1 = mixK1(halfWord); + h1 = mixH1(h1, k1); + } + return h1; + } + + private static int hashBytesByInt(MemorySegment segment, int offset, int lengthInBytes, int seed) { + assert (lengthInBytes % 4 == 0); + int h1 = seed; + for (int i = 0; i < lengthInBytes; i += 4) { + int halfWord = segment.getInt(offset + i); + int k1 = mixK1(halfWord); + h1 = mixH1(h1, k1); + } + return h1; + } + + private static int mixK1(int k1) { + k1 *= C1; + k1 = Integer.rotateLeft(k1, 15); + k1 *= C2; + return k1; + } + + private static int mixH1(int h1, int k1) { + h1 ^= k1; + h1 = Integer.rotateLeft(h1, 13); + h1 = h1 * 5 + 0xe6546b64; + return h1; + } + + // Finalization mix - force all bits of a hash block to avalanche + private static int fmix(int h1, int length) { + h1 ^= length; + return fmix(h1); + } + + public static int fmix(int h) { + h ^= h >>> 16; + h *= 0x85ebca6b; + h ^= h >>> 13; + h *= 0xc2b2ae35; + h ^= h >>> 16; + return h; + } + + public static long fmix(long h) { + h ^= (h >>> 33); + h *= 0xff51afd7ed558ccdL; + h ^= (h >>> 33); + h *= 0xc4ceb9fe1a85ec53L; + h ^= (h >>> 33); + return h; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/NestedRowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/NestedRowData.java new file mode 100644 index 0000000..66bb953 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/NestedRowData.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.binary; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; + +import static org.apache.flink.table.data.binary.BinaryRowData.calculateBitSetWidthInBytes; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Its memory storage structure is exactly the same with {@link BinaryRowData}. + * The only different is that, as {@link NestedRowData} is used + * to store row value in the variable-length part of {@link BinaryRowData}, + * every field (including both fixed-length part and variable-length part) of {@link NestedRowData} + * has a possibility to cross the boundary of a segment, while the fixed-length part of {@link BinaryRowData} + * must fit into its first memory segment. + */ +@Internal +public final class NestedRowData extends BinarySection implements RowData, TypedSetters { + + private final int arity; + private final int nullBitsSizeInBytes; + + public NestedRowData(int arity) { + checkArgument(arity >= 0); + this.arity = arity; + this.nullBitsSizeInBytes = calculateBitSetWidthInBytes(arity); + } + + private int getFieldOffset(int pos) { + return offset + nullBitsSizeInBytes + pos * 8; + } + + private void assertIndexIsValid(int index) { + assert index >= 0 : "index (" + index + ") should >= 0"; + assert index < arity : "index (" + index + ") should < " + arity; + } + + @Override + public int getArity() { + return arity; + } + + @Override + public RowKind getRowKind() { + byte kindValue = BinarySegmentUtils.getByte(segments, offset); + return RowKind.fromByteValue(kindValue); + } + + @Override + public void setRowKind(RowKind kind) { + BinarySegmentUtils.setByte(segments, offset, kind.toByteValue()); + } + + private void setNotNullAt(int i) { + assertIndexIsValid(i); + BinarySegmentUtils.bitUnSet(segments, offset, i + 8); + } + + /** + * See {@link BinaryRowData#setNullAt(int)}. + */ + @Override + public void setNullAt(int i) { + assertIndexIsValid(i); + BinarySegmentUtils.bitSet(segments, offset, i + 8); + BinarySegmentUtils.setLong(segments, getFieldOffset(i), 0); + } + + @Override + public void setInt(int pos, int value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + BinarySegmentUtils.setInt(segments, getFieldOffset(pos), value); + } + + @Override + public void setLong(int pos, long value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + BinarySegmentUtils.setLong(segments, getFieldOffset(pos), value); + } + + @Override + public void setDouble(int pos, double value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + BinarySegmentUtils.setDouble(segments, getFieldOffset(pos), value); + } + + @Override + public void setDecimal(int pos, DecimalData value, int precision) { + assertIndexIsValid(pos); + + if (DecimalData.isCompact(precision)) { + // compact format + setLong(pos, value.toUnscaledLong()); + } else { + int fieldOffset = getFieldOffset(pos); + int cursor = (int) (BinarySegmentUtils.getLong(segments, fieldOffset) >>> 32); + assert cursor > 0 : "invalid cursor " + cursor; + // zero-out the bytes + BinarySegmentUtils.setLong(segments, offset + cursor, 0L); + BinarySegmentUtils.setLong(segments, offset + cursor + 8, 0L); + + if (value == null) { + setNullAt(pos); + // keep the offset for future update + BinarySegmentUtils.setLong(segments, fieldOffset, ((long) cursor) << 32); + } else { + + byte[] bytes = value.toUnscaledBytes(); + assert (bytes.length <= 16); + + // Write the bytes to the variable length portion. + BinarySegmentUtils.copyFromBytes(segments, offset + cursor, bytes, 0, bytes.length); + setLong(pos, ((long) cursor << 32) | ((long) bytes.length)); + } + } + } + + @Override + public void setTimestamp(int pos, TimestampData value, int precision) { + assertIndexIsValid(pos); + + if (TimestampData.isCompact(precision)) { + setLong(pos, value.getMillisecond()); + } else { + int fieldOffset = getFieldOffset(pos); + int cursor = (int) (BinarySegmentUtils.getLong(segments, fieldOffset) >>> 32); + assert cursor > 0 : "invalid cursor " + cursor; + + if (value == null) { + setNullAt(pos); + // zero-out the bytes + BinarySegmentUtils.setLong(segments, offset + cursor, 0L); + BinarySegmentUtils.setLong(segments, fieldOffset, ((long) cursor) << 32); + } else { + // write millisecond to variable length portion. + BinarySegmentUtils.setLong(segments, offset + cursor, value.getMillisecond()); + // write nanoOfMillisecond to fixed-length portion. + setLong(pos, ((long) cursor << 32) | (long) value.getNanoOfMillisecond()); + } + } + } + + @Override + public void setBoolean(int pos, boolean value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + BinarySegmentUtils.setBoolean(segments, getFieldOffset(pos), value); + } + + @Override + public void setShort(int pos, short value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + BinarySegmentUtils.setShort(segments, getFieldOffset(pos), value); + } + + @Override + public void setByte(int pos, byte value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + BinarySegmentUtils.setByte(segments, getFieldOffset(pos), value); + } + + @Override + public void setFloat(int pos, float value) { + assertIndexIsValid(pos); + setNotNullAt(pos); + BinarySegmentUtils.setFloat(segments, getFieldOffset(pos), value); + } + + @Override + public boolean isNullAt(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.bitGet(segments, offset, pos + 8); + } + + @Override + public boolean getBoolean(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.getBoolean(segments, getFieldOffset(pos)); + } + + @Override + public byte getByte(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.getByte(segments, getFieldOffset(pos)); + } + + @Override + public short getShort(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.getShort(segments, getFieldOffset(pos)); + } + + @Override + public int getInt(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.getInt(segments, getFieldOffset(pos)); + } + + @Override + public long getLong(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.getLong(segments, getFieldOffset(pos)); + } + + @Override + public float getFloat(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.getFloat(segments, getFieldOffset(pos)); + } + + @Override + public double getDouble(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.getDouble(segments, getFieldOffset(pos)); + } + + @Override + public StringData getString(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getFieldOffset(pos); + final long offsetAndLen = BinarySegmentUtils.getLong(segments, fieldOffset); + return BinarySegmentUtils.readStringData(segments, offset, fieldOffset, offsetAndLen); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + assertIndexIsValid(pos); + + if (DecimalData.isCompact(precision)) { + return DecimalData.fromUnscaledLong( + BinarySegmentUtils.getLong(segments, getFieldOffset(pos)), + precision, + scale); + } + + int fieldOffset = getFieldOffset(pos); + final long offsetAndSize = BinarySegmentUtils.getLong(segments, fieldOffset); + return BinarySegmentUtils.readDecimalData(segments, offset, offsetAndSize, precision, scale); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + assertIndexIsValid(pos); + + if (TimestampData.isCompact(precision)) { + return TimestampData.fromEpochMillis(BinarySegmentUtils.getLong(segments, getFieldOffset(pos))); + } + + int fieldOffset = getFieldOffset(pos); + final long offsetAndNanoOfMilli = BinarySegmentUtils.getLong(segments, fieldOffset); + return BinarySegmentUtils.readTimestampData(segments, offset, offsetAndNanoOfMilli); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.readRawValueData(segments, offset, getLong(pos)); + } + + @Override + public byte[] getBinary(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getFieldOffset(pos); + final long offsetAndLen = BinarySegmentUtils.getLong(segments, fieldOffset); + return BinarySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndLen); + } + + @Override + public RowData getRow(int pos, int numFields) { + assertIndexIsValid(pos); + return BinarySegmentUtils.readRowData(segments, numFields, offset, getLong(pos)); + } + + @Override + public ArrayData getArray(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.readArrayData(segments, offset, getLong(pos)); + } + + @Override + public MapData getMap(int pos) { + assertIndexIsValid(pos); + return BinarySegmentUtils.readMapData(segments, offset, getLong(pos)); + } + + public NestedRowData copy() { + return copy(new NestedRowData(arity)); + } + + public NestedRowData copy(RowData reuse) { + return copyInternal((NestedRowData) reuse); + } + + private NestedRowData copyInternal(NestedRowData reuse) { + byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes); + reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes); + return reuse; + } + + @Override + public int hashCode() { + return BinarySegmentUtils.hashByWords(segments, offset, sizeInBytes); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/StringUtf8Utils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/StringUtf8Utils.java new file mode 100644 index 0000000..8e6094b --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/StringUtf8Utils.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.binary; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.MemorySegment; + +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.apache.flink.table.data.binary.BinarySegmentUtils.allocateReuseBytes; +import static org.apache.flink.table.data.binary.BinarySegmentUtils.allocateReuseChars; + +/** + * Utilities for String UTF-8. + */ +@Internal +final class StringUtf8Utils { + + private static final int MAX_BYTES_PER_CHAR = 3; + + private StringUtf8Utils() { + // do not instantiate + } + + /** + * This method must have the same result with JDK's String.getBytes. + */ + public static byte[] encodeUTF8(String str) { + byte[] bytes = allocateReuseBytes(str.length() * MAX_BYTES_PER_CHAR); + int len = encodeUTF8(str, bytes); + return Arrays.copyOf(bytes, len); + } + + public static int encodeUTF8(String str, byte[] bytes) { + int offset = 0; + int len = str.length(); + int sl = offset + len; + int dp = 0; + int dlASCII = dp + Math.min(len, bytes.length); + + // ASCII only optimized loop + while (dp < dlASCII && str.charAt(offset) < '\u0080') { + bytes[dp++] = (byte) str.charAt(offset++); + } + + while (offset < sl) { + char c = str.charAt(offset++); + if (c < 0x80) { + // Have at most seven bits + bytes[dp++] = (byte) c; + } else if (c < 0x800) { + // 2 bytes, 11 bits + bytes[dp++] = (byte) (0xc0 | (c >> 6)); + bytes[dp++] = (byte) (0x80 | (c & 0x3f)); + } else if (Character.isSurrogate(c)) { + final int uc; + int ip = offset - 1; + if (Character.isHighSurrogate(c)) { + if (sl - ip < 2) { + uc = -1; + } else { + char d = str.charAt(ip + 1); + if (Character.isLowSurrogate(d)) { + uc = Character.toCodePoint(c, d); + } else { + // for some illegal character + // the jdk will ignore the origin character and cast it to '?' + // this acts the same with jdk + return defaultEncodeUTF8(str, bytes); + } + } + } else { + if (Character.isLowSurrogate(c)) { + // for some illegal character + // the jdk will ignore the origin character and cast it to '?' + // this acts the same with jdk + return defaultEncodeUTF8(str, bytes); + } else { + uc = c; + } + } + + if (uc < 0) { + bytes[dp++] = (byte) '?'; + } else { + bytes[dp++] = (byte) (0xf0 | ((uc >> 18))); + bytes[dp++] = (byte) (0x80 | ((uc >> 12) & 0x3f)); + bytes[dp++] = (byte) (0x80 | ((uc >> 6) & 0x3f)); + bytes[dp++] = (byte) (0x80 | (uc & 0x3f)); + offset++; // 2 chars + } + } else { + // 3 bytes, 16 bits + bytes[dp++] = (byte) (0xe0 | ((c >> 12))); + bytes[dp++] = (byte) (0x80 | ((c >> 6) & 0x3f)); + bytes[dp++] = (byte) (0x80 | (c & 0x3f)); + } + } + return dp; + } + + public static int defaultEncodeUTF8(String str, byte[] bytes) { + try { + byte[] buffer = str.getBytes("UTF-8"); + System.arraycopy(buffer, 0, bytes, 0, buffer.length); + return buffer.length; + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("encodeUTF8 error", e); + } + } + + public static String decodeUTF8(byte[] input, int offset, int byteLen) { + char[] chars = allocateReuseChars(byteLen); + int len = decodeUTF8Strict(input, offset, byteLen, chars); + if (len < 0) { + return defaultDecodeUTF8(input, offset, byteLen); + } + return new String(chars, 0, len); + } + + public static int decodeUTF8Strict(byte[] sa, int sp, int len, char[] da) { + final int sl = sp + len; + int dp = 0; + int dlASCII = Math.min(len, da.length); + + // ASCII only optimized loop + while (dp < dlASCII && sa[sp] >= 0) { + da[dp++] = (char) sa[sp++]; + } + + while (sp < sl) { + int b1 = sa[sp++]; + if (b1 >= 0) { + // 1 byte, 7 bits: 0xxxxxxx + da[dp++] = (char) b1; + } else if ((b1 >> 5) == -2 && (b1 & 0x1e) != 0) { + // 2 bytes, 11 bits: 110xxxxx 10xxxxxx + if (sp < sl) { + int b2 = sa[sp++]; + if ((b2 & 0xc0) != 0x80) { // isNotContinuation(b2) + return -1; + } else { + da[dp++] = (char) (((b1 << 6) ^ b2) ^ (((byte) 0xC0 << 6) ^ ((byte) 0x80))); + } + continue; + } + return -1; + } else if ((b1 >> 4) == -2) { + // 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx + if (sp + 1 < sl) { + int b2 = sa[sp++]; + int b3 = sa[sp++]; + if ((b1 == (byte) 0xe0 && (b2 & 0xe0) == 0x80) + || (b2 & 0xc0) != 0x80 + || (b3 & 0xc0) != 0x80) { // isMalformed3(b1, b2, b3) + return -1; + } else { + char c = (char) ((b1 << 12) ^ (b2 << 6) ^ (b3 ^ + (((byte) 0xE0 << 12) ^ ((byte) 0x80 << 6) ^ ((byte) 0x80)))); + if (Character.isSurrogate(c)) { + return -1; + } else { + da[dp++] = c; + } + } + continue; + } + return -1; + } else if ((b1 >> 3) == -2) { + // 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx + if (sp + 2 < sl) { + int b2 = sa[sp++]; + int b3 = sa[sp++]; + int b4 = sa[sp++]; + int uc = ((b1 << 18) ^ + (b2 << 12) ^ + (b3 << 6) ^ + (b4 ^ (((byte) 0xF0 << 18) ^ ((byte) 0x80 << 12) ^ + ((byte) 0x80 << 6) ^ ((byte) 0x80)))); + // isMalformed4 and shortest form check + if (((b2 & 0xc0) != 0x80 || (b3 & 0xc0) != 0x80 || (b4 & 0xc0) != 0x80) + || !Character.isSupplementaryCodePoint(uc)) { + return -1; + } else { + da[dp++] = Character.highSurrogate(uc); + da[dp++] = Character.lowSurrogate(uc); + } + continue; + } + return -1; + } else { + return -1; + } + } + return dp; + } + + public static String decodeUTF8(MemorySegment input, int offset, int byteLen) { + char[] chars = allocateReuseChars(byteLen); + int len = decodeUTF8Strict(input, offset, byteLen, chars); + if (len < 0) { + byte[] bytes = allocateReuseBytes(byteLen); + input.get(offset, bytes, 0, byteLen); + return defaultDecodeUTF8(bytes, 0, byteLen); + } + return new String(chars, 0, len); + } + + public static int decodeUTF8Strict(MemorySegment segment, int sp, int len, char[] da) { + final int sl = sp + len; + int dp = 0; + int dlASCII = Math.min(len, da.length); + + // ASCII only optimized loop + while (dp < dlASCII && segment.get(sp) >= 0) { + da[dp++] = (char) segment.get(sp++); + } + + while (sp < sl) { + int b1 = segment.get(sp++); + if (b1 >= 0) { + // 1 byte, 7 bits: 0xxxxxxx + da[dp++] = (char) b1; + } else if ((b1 >> 5) == -2 && (b1 & 0x1e) != 0) { + // 2 bytes, 11 bits: 110xxxxx 10xxxxxx + if (sp < sl) { + int b2 = segment.get(sp++); + if ((b2 & 0xc0) != 0x80) { // isNotContinuation(b2) + return -1; + } else { + da[dp++] = (char) (((b1 << 6) ^ b2) ^ (((byte) 0xC0 << 6) ^ ((byte) 0x80))); + } + continue; + } + return -1; + } else if ((b1 >> 4) == -2) { + // 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx + if (sp + 1 < sl) { + int b2 = segment.get(sp++); + int b3 = segment.get(sp++); + if ((b1 == (byte) 0xe0 && (b2 & 0xe0) == 0x80) + || (b2 & 0xc0) != 0x80 + || (b3 & 0xc0) != 0x80) { // isMalformed3(b1, b2, b3) + return -1; + } else { + char c = (char) ((b1 << 12) ^ (b2 << 6) ^ (b3 ^ + (((byte) 0xE0 << 12) ^ ((byte) 0x80 << 6) ^ ((byte) 0x80)))); + if (Character.isSurrogate(c)) { + return -1; + } else { + da[dp++] = c; + } + } + continue; + } + return -1; + } else if ((b1 >> 3) == -2) { + // 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx + if (sp + 2 < sl) { + int b2 = segment.get(sp++); + int b3 = segment.get(sp++); + int b4 = segment.get(sp++); + int uc = ((b1 << 18) ^ + (b2 << 12) ^ + (b3 << 6) ^ + (b4 ^ (((byte) 0xF0 << 18) ^ ((byte) 0x80 << 12) ^ + ((byte) 0x80 << 6) ^ ((byte) 0x80)))); + // isMalformed4 and shortest form check + if (((b2 & 0xc0) != 0x80 || (b3 & 0xc0) != 0x80 || (b4 & 0xc0) != 0x80) + || !Character.isSupplementaryCodePoint(uc)) { + return -1; + } else { + da[dp++] = Character.highSurrogate(uc); + da[dp++] = Character.lowSurrogate(uc); + } + continue; + } + return -1; + } else { + return -1; + } + } + return dp; + } + + public static String defaultDecodeUTF8(byte[] bytes, int offset, int len) { + return new String(bytes, offset, len, StandardCharsets.UTF_8); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/TypedSetters.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/TypedSetters.java new file mode 100644 index 0000000..3555170 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/TypedSetters.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.binary; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.TimestampData; + +/** + * Provide type specialized setters to reduce if/else and eliminate box and unbox. This is mainly + * used on the binary format such as {@link BinaryRowData}. + */ +@Internal +public interface TypedSetters { + + void setNullAt(int pos); + + void setBoolean(int pos, boolean value); + + void setByte(int pos, byte value); + + void setShort(int pos, short value); + + void setInt(int pos, int value); + + void setLong(int pos, long value); + + void setFloat(int pos, float value); + + void setDouble(int pos, double value); + + /** + * Set the decimal column value. + * + * <p>Note: + * Precision is compact: can call {@link #setNullAt} when decimal is null. + * Precision is not compact: can not call {@link #setNullAt} when decimal is null, must call + * {@code setDecimal(pos, null, precision)} because we need update var-length-part. + */ + void setDecimal(int pos, DecimalData value, int precision); + + /** + * Set Timestamp value. + * + * <p>Note: + * If precision is compact: can call {@link #setNullAt} when TimestampData value is null. + * Otherwise: can not call {@link #setNullAt} when TimestampData value is null, must call + * {@code setTimestamp(pos, null, precision)} because we need to update var-length-part. + */ + void setTimestamp(int pos, TimestampData value, int precision); +}
