herunkang2018 commented on code in PR #3434:
URL: https://github.com/apache/flink-cdc/pull/3434#discussion_r1897949986


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryMapData.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.data.binary;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument;
+
+/**
+ * [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray].
+ *
+ * <p>{@code BinaryMap} are influenced by Apache Spark UnsafeMapData.

Review Comment:
   Is this a typo, BinaryMap -> BinaryMapData?



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryArrayData.java:
##########
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.data.binary;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.StringData;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.ZonedTimestampData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import java.lang.reflect.Array;
+
+import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
+
+/**
+ * A binary implementation of {@link ArrayData} which is backed by {@link 
MemorySegment}s.
+ *
+ * <p>This class provides a way to store array data in a binary format that is 
compact and
+ * efficient. It uses {@link MemorySegment}s to manage the binary 
representation of the data,
+ * allowing for efficient storage and access.
+ *
+ * <p>The binary layout of {@link BinaryArrayData} is structured as follows:
+ *
+ * <pre>
+ * [size(int)] + [null bits(4-byte word boundaries)] + [values or 
offset&length] + [variable length part].
+ * </pre>
+ *
+ * <ul>
+ *   <li><b>size:</b> The first 4 bytes store the number of elements in the 
array.
+ *   <li><b>null bits:</b> A bitmap to track null values, aligned to 4-byte 
word boundaries. Each
+ *       bit represents whether an element is null.
+ *   <li><b>values or offset&length:</b> The values of the array elements. For 
fixed-length
+ *       primitive types, the values are stored directly. For variable-length 
types (e.g., strings,
+ *       maps), this part stores the offset and length of the actual data in 
the variable length
+ *       part.
+ *   <li><b>variable length part:</b> This part of the memory segment stores 
the actual data for
+ *       variable-length types (e.g., strings, maps).
+ * </ul>
+ *
+ * <p>The header size is calculated based on the number of elements in the 
array, ensuring efficient
+ * alignment and access.
+ *
+ * <p>For fields that hold fixed-length primitive types, such as long, double, 
or int, they are
+ * stored compactly in bytes, just like the original Java array.
+ *
+ * <p>The class also provides methods to convert the binary data back into 
Java primitive arrays,
+ * handling various types such as boolean, byte, short, int, long, float, and 
double.
+ */
+public final class BinaryArrayData extends BinarySection implements ArrayData {
+
+    /** Offset for Arrays. */
+    private static final int BYTE_ARRAY_BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
+
+    private static final int BOOLEAN_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(boolean[].class);
+    private static final int SHORT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(short[].class);
+    private static final int INT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(int[].class);
+    private static final int LONG_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(long[].class);
+    private static final int FLOAT_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(float[].class);
+    private static final int DOUBLE_ARRAY_OFFSET = 
UNSAFE.arrayBaseOffset(double[].class);
+
+    /**
+     * Calculates the size of the header in bytes for an array with the 
specified number of fields.
+     *
+     * <p>The header consists of:
+     *
+     * <ul>
+     *   <li>4 bytes to store the size of the array (number of elements).
+     *   <li>A bitmap to track null values, where each bit represents whether 
an element is null.
+     *       This bitmap is aligned to 4-byte word boundaries for efficient 
memory access and to
+     *       facilitate the use of bitwise operations.
+     * </ul>
+     *
+     * <p>The size of the bitmap is determined by the number of elements in 
the array:
+     *
+     * <ul>
+     *   <li>Each element requires 1 bit in the bitmap.
+     *   <li>The total number of bits is rounded up to the nearest multiple of 
32 to ensure
+     *       alignment to 4-byte word boundaries (i.e., a 32-bit integer).
+     * </ul>
+     *
+     * <p>The formula for calculating the size of the header is:
+     *
+     * <pre>
+     *   header size = 4 bytes (for array size) + ((numFields + 31) / 32) * 4 
bytes (for null bitmap)
+     * </pre>
+     *
+     * @param numFields the number of elements in the array
+     * @return the size of the header in bytes
+     */
+    public static int calculateHeaderInBytes(int numFields) {
+        return 4 + ((numFields + 31) / 32) * 4;
+    }
+
+    /**
+     * It store real value when type is primitive. It store the length and 
offset of variable-length
+     * part when type is string, map, etc.
+     */
+    public static int calculateFixLengthPartSize(DataType type) {
+        // ordered by type root definition
+        switch (type.getTypeRoot()) {
+            case BOOLEAN:
+            case TINYINT:
+                return 1;
+            case CHAR:
+            case VARCHAR:
+            case BINARY:
+            case VARBINARY:
+            case DECIMAL:
+            case BIGINT:
+            case DOUBLE:
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+            case ARRAY:
+            case MAP:
+            case ROW:
+                // long and double are 8 bytes;
+                // otherwise it stores the length and offset of the 
variable-length part for types
+                // such as is string, map, etc.
+                return 8;
+            case TIMESTAMP_WITH_TIME_ZONE:
+                throw new UnsupportedOperationException();
+            case SMALLINT:
+                return 2;
+            case INTEGER:
+            case FLOAT:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+                return 4;
+            default:
+                throw new IllegalArgumentException();
+        }
+    }
+
+    // The number of elements in this array
+    private int size;
+
+    /** The position to start storing array elements. */

Review Comment:
   Minor comment, would you like to unify code comment format? Seems here use 
two comment formats.



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryMapData.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.data.binary;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument;
+
+/**

Review Comment:
   Could you add more description for this class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to