umeshdangat commented on code in PR #3434: URL: https://github.com/apache/flink-cdc/pull/3434#discussion_r1706193569
########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryArrayData.java: ########## @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.data.binary; + +import org.apache.flink.cdc.common.data.ArrayData; +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.MapData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.StringData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZonedTimestampData; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; + +import java.lang.reflect.Array; + +import static org.apache.flink.core.memory.MemoryUtils.UNSAFE; + +/** + * A binary implementation of {@link ArrayData} which is backed by {@link MemorySegment}s. + * + * <p>For fields that hold fixed-length primitive types, such as long, double or int, they are + * stored compacted in bytes, just like the original java array. + * + * <p>The binary layout of {@link BinaryArrayData}: + * + * <pre> + * [size(int)] + [null bits(4-byte word boundaries)] + [values or offset&length] + [variable length part]. + * </pre> + */ +public final class BinaryArrayData extends BinarySection implements ArrayData { + + /** Offset for Arrays. */ + private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final int BOOLEAN_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(boolean[].class); + private static final int SHORT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(short[].class); + private static final int INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class); + private static final int LONG_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(long[].class); + private static final int FLOAT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(float[].class); + private static final int DOUBLE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(double[].class); + + public static int calculateHeaderInBytes(int numFields) { + return 4 + ((numFields + 31) / 32) * 4; + } + + /** + * It store real value when type is primitive. It store the length and offset of variable-length + * part when type is string, map, etc. + */ + public static int calculateFixLengthPartSize(DataType type) { + // ordered by type root definition + switch (type.getTypeRoot()) { + case BOOLEAN: + case TINYINT: + return 1; + case CHAR: + case VARCHAR: + case BINARY: + case VARBINARY: + case DECIMAL: + case BIGINT: + case DOUBLE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case ARRAY: + case MAP: + case ROW: + // long and double are 8 bytes; + // otherwise it stores the length and offset of the variable-length part for types + // such as is string, map, etc. + return 8; + case TIMESTAMP_WITH_TIME_ZONE: + throw new UnsupportedOperationException(); + case SMALLINT: + return 2; + case INTEGER: + case FLOAT: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + return 4; + default: + throw new IllegalArgumentException(); + } + } + + // The number of elements in this array + private int size; + + /** The position to start storing array elements. */ + private int elementOffset; + + public BinaryArrayData() {} + + private void assertIndexIsValid(int ordinal) { Review Comment: done. ########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryArrayData.java: ########## @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.data.binary; + +import org.apache.flink.cdc.common.data.ArrayData; +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.MapData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.StringData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZonedTimestampData; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; + +import java.lang.reflect.Array; + +import static org.apache.flink.core.memory.MemoryUtils.UNSAFE; + +/** + * A binary implementation of {@link ArrayData} which is backed by {@link MemorySegment}s. + * + * <p>For fields that hold fixed-length primitive types, such as long, double or int, they are + * stored compacted in bytes, just like the original java array. + * + * <p>The binary layout of {@link BinaryArrayData}: + * + * <pre> + * [size(int)] + [null bits(4-byte word boundaries)] + [values or offset&length] + [variable length part]. + * </pre> + */ +public final class BinaryArrayData extends BinarySection implements ArrayData { + + /** Offset for Arrays. */ + private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final int BOOLEAN_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(boolean[].class); + private static final int SHORT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(short[].class); + private static final int INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class); + private static final int LONG_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(long[].class); + private static final int FLOAT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(float[].class); + private static final int DOUBLE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(double[].class); + + public static int calculateHeaderInBytes(int numFields) { + return 4 + ((numFields + 31) / 32) * 4; + } + + /** + * It store real value when type is primitive. It store the length and offset of variable-length + * part when type is string, map, etc. + */ + public static int calculateFixLengthPartSize(DataType type) { + // ordered by type root definition + switch (type.getTypeRoot()) { + case BOOLEAN: + case TINYINT: + return 1; + case CHAR: + case VARCHAR: + case BINARY: + case VARBINARY: + case DECIMAL: + case BIGINT: + case DOUBLE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case ARRAY: + case MAP: + case ROW: + // long and double are 8 bytes; + // otherwise it stores the length and offset of the variable-length part for types + // such as is string, map, etc. + return 8; + case TIMESTAMP_WITH_TIME_ZONE: + throw new UnsupportedOperationException(); + case SMALLINT: + return 2; + case INTEGER: + case FLOAT: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + return 4; + default: + throw new IllegalArgumentException(); + } + } + + // The number of elements in this array + private int size; + + /** The position to start storing array elements. */ + private int elementOffset; + + public BinaryArrayData() {} + + private void assertIndexIsValid(int ordinal) { + assert ordinal >= 0 : "ordinal (" + ordinal + ") should >= 0"; + assert ordinal < size : "ordinal (" + ordinal + ") should < " + size; + } + + private int getElementOffset(int ordinal, int elementSize) { + return elementOffset + ordinal * elementSize; + } + + @Override + public int size() { + return size; + } + + @Override + public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) { + // Read the number of elements from the first 4 bytes. + final int size = BinarySegmentUtils.getInt(segments, offset); + assert size >= 0 : "size (" + size + ") should >= 0"; + + this.size = size; Review Comment: done. ########## flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryArrayData.java: ########## @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.data.binary; + +import org.apache.flink.cdc.common.data.ArrayData; +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.MapData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.StringData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZonedTimestampData; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; + +import java.lang.reflect.Array; + +import static org.apache.flink.core.memory.MemoryUtils.UNSAFE; + +/** + * A binary implementation of {@link ArrayData} which is backed by {@link MemorySegment}s. + * Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
