wuchong commented on code in PR #2040:
URL: https://github.com/apache/fluss/pull/2040#discussion_r2573470645


##########
fluss-common/src/main/java/org/apache/fluss/row/decode/CompactedRowDecoder.java:
##########
@@ -40,8 +40,12 @@ public CompactedRow decode(byte[] values) {
 
     @Override
     public CompactedRow decode(MemorySegment segment, int offset, int 
sizeInBytes) {
+        // Copy the data to avoid sharing the underlying MemorySegment
+        byte[] rowBytes = new byte[sizeInBytes];
+        segment.get(offset, rowBytes, 0, sizeInBytes);
+
         CompactedRow compactedRow = new CompactedRow(fieldDataTypes.length, 
deserializer);
-        compactedRow.pointTo(segment, offset, sizeInBytes);
+        compactedRow.pointTo(MemorySegment.wrap(rowBytes), 0, sizeInBytes);

Review Comment:
   We should avoid deep copy, this introduces performance regression. 



##########
fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java:
##########
@@ -223,7 +223,10 @@ public CloseableIterator<LogRecord> records(ReadContext 
context) {
                         rowType,
                         context.getVectorSchemaRoot(schemaId),
                         context.getBufferAllocator(),
-                        timestamp);
+                        timestamp,
+                        context instanceof LogRecordReadContext
+                                ? (LogRecordReadContext) context
+                                : null);

Review Comment:
   This force casting is hack and error-prone in the future. Because this may 
break if we introduce another ReadContext implementation, and here will use 
`null`. 



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkArrayTypeITCase.java:
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration tests for Array type support in Flink connector. */
+abstract class FlinkArrayTypeITCase extends AbstractTestBase {

Review Comment:
   IT case is very heavy, we should avoid add too many tests just for one 
purpose.  I suggest to do the following updates:
   
   1. Rename `FlinkArrayTypeITCase` to `FlinkComplexTypeITCase` to cover future 
Map and Row types. 
   2. Add a test for LOG TABLE that covers all array types (ARRAY<not null 
primitive types>, ARRAY<nullable element types>, several ARRAY nested types). 
Writing reocords (+ null elements if the element type is nullable) into the 
table, and read from the table. 
   3. Add a test for PK TABLE that is the same with above types. Also writing 
and reading from the table. But we should test updating and deleting as well. 
Take 
`org.apache.fluss.flink.source.FlinkTableSourceITCase#testReadKvTableWithScanStartupModeEqualsFull`
 as an example about how to verify reading kv table for both snapshot read and 
incremental read. Then test lookup join the table. 
   4. Add exception test that Array type can't be as primary key, or bucket 
key, or partition key.  Add tests to verify this.  



##########
fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java:
##########
@@ -159,28 +159,94 @@ public static ArrowReader createArrowReader(
             MemorySegment segment,
             int arrowOffset,
             int arrowLength,
-            VectorSchemaRoot schemaRoot,
+            VectorSchemaRoot sharedSchemaRoot,
             BufferAllocator allocator,
-            RowType rowType) {
+            RowType rowType,
+            org.apache.fluss.record.LogRecordReadContext readContext) {
         ByteBuffer arrowBatchBuffer = segment.wrap(arrowOffset, arrowLength);
+
         try (ReadChannel channel =
                         new ReadChannel(new 
ByteBufferReadableChannel(arrowBatchBuffer));
                 ArrowRecordBatch batch = deserializeRecordBatch(channel, 
allocator)) {
+
+            // Create a new VectorSchemaRoot for each batch to avoid data 
contamination
+            // when the shared root is reused for subsequent batches
+            VectorSchemaRoot batchSchemaRoot =
+                    VectorSchemaRoot.create(sharedSchemaRoot.getSchema(), 
allocator);
+
             VectorLoader vectorLoader =
-                    new VectorLoader(schemaRoot, 
ArrowCompressionFactory.INSTANCE);
+                    new VectorLoader(batchSchemaRoot, 
ArrowCompressionFactory.INSTANCE);
             vectorLoader.load(batch);
+
+            // Fix buffer writerIndex after VectorLoader.load
+            // VectorLoader sets capacity but not writerIndex for buffers
+            fixVectorBuffers(batchSchemaRoot);
+
             List<ColumnVector> columnVectors = new ArrayList<>();
-            List<FieldVector> fieldVectors = schemaRoot.getFieldVectors();
+            List<FieldVector> fieldVectors = batchSchemaRoot.getFieldVectors();
             for (int i = 0; i < fieldVectors.size(); i++) {
                 columnVectors.add(
                         createArrowColumnVector(fieldVectors.get(i), 
rowType.getTypeAt(i)));
             }
-            return new ArrowReader(schemaRoot, columnVectors.toArray(new 
ColumnVector[0]));
+
+            // Register the batch root with the read context so it can be 
closed later
+            if (readContext != null) {
+                readContext.registerBatchRoot(batchSchemaRoot);
+            }
+
+            return new ArrowReader(batchSchemaRoot, columnVectors.toArray(new 
ColumnVector[0]));
         } catch (IOException e) {
             throw new RuntimeException("Failed to deserialize 
ArrowRecordBatch.", e);
         }
     }
 
+    /**
+     * Fixes writerIndex for all buffers in all vectors after 
VectorLoader.load().
+     * VectorLoader.load() sets the capacity but not the writerIndex for 
buffers.
+     */
+    private static void fixVectorBuffers(VectorSchemaRoot schemaRoot) {

Review Comment:
   Why do we need this for array type? Why we don't need this before?



##########
fluss-common/src/main/java/org/apache/fluss/row/encode/CompactedRowEncoder.java:
##########
@@ -60,8 +60,12 @@ public void encodeField(int pos, Object value) {
 
     @Override
     public CompactedRow finishRow() {
+        int rowSize = writer.position();
+        byte[] rowBytes = new byte[rowSize];
+        writer.segment().get(0, rowBytes, 0, rowSize);
+
         CompactedRow row = new CompactedRow(fieldDataTypes.length, 
compactedRowDeserializer);
-        row.pointTo(writer.segment(), 0, writer.position());
+        row.pointTo(org.apache.fluss.memory.MemorySegment.wrap(rowBytes), 0, 
rowSize);

Review Comment:
   We should avoid deep copy, this introduces performance regression. 



##########
fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java:
##########
@@ -164,6 +168,7 @@ private LogRecordReadContext(
         this.bufferAllocator = bufferAllocator;
         this.selectedFieldGetters = selectedFieldGetters;
         this.projectionPushDowned = projectionPushDowned;
+        this.batchRoots = Collections.synchronizedList(new ArrayList<>());

Review Comment:
   `synchronizedList` is performant bad, please avoid to use this. 



##########
fluss-common/src/main/java/org/apache/fluss/row/serializer/ArraySerializer.java:
##########
@@ -113,7 +113,7 @@ public BinaryArray toAlignedArray(InternalArray from) {
             }
             reuseWriter.complete();
 
-            return reuseArray;
+            return reuseArray.copy();

Review Comment:
   We should avoid `copy()`, this introduces performance regression. 



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkArrayTypeITCase.java:
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration tests for Array type support in Flink connector. */
+abstract class FlinkArrayTypeITCase extends AbstractTestBase {
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder().setNumOfTabletServers(3).build();
+
+    static final String CATALOG_NAME = "testcatalog";
+    static final String DEFAULT_DB = "defaultdb";
+
+    protected StreamExecutionEnvironment env;
+    protected StreamTableEnvironment tEnv;
+    protected TableEnvironment tBatchEnv;
+
+    @BeforeEach
+    void before() {
+        String bootstrapServers = 
FLUSS_CLUSTER_EXTENSION.getBootstrapServers();
+
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+
+        tEnv = StreamTableEnvironment.create(env);
+        tEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
+                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
+        tEnv.executeSql("use catalog " + CATALOG_NAME);
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+
+        tBatchEnv =
+                
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        tBatchEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
+                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
+        tBatchEnv.executeSql("use catalog " + CATALOG_NAME);
+        tBatchEnv
+                .getConfig()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+
+        tEnv.executeSql("create database " + DEFAULT_DB);
+        tEnv.useDatabase(DEFAULT_DB);
+        tBatchEnv.useDatabase(DEFAULT_DB);
+    }
+
+    @AfterEach
+    void after() {
+        tEnv.useDatabase(BUILTIN_DATABASE);
+        tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
+    }
+
+    @Test
+    void testSimpleLogTableWithSinkAPI() throws Exception {

Review Comment:
   This test is not about array type and is not needed. 



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java:
##########
@@ -64,12 +73,25 @@ public RowDataSerializationSchema(boolean isAppendOnly, 
boolean ignoreDelete) {
     @Override
     public void open(InitializationContext context) throws Exception {
         this.converter = new FlinkAsFlussRow();
+        // For primary key tables (non-append-only), we need to encode the row 
immediately
+        // to avoid issues with Flink reusing RowData objects

Review Comment:
   I don't understand this, if it's for primary key tables, then `UpsertWriter` 
will materialize/encode the row into binary format which already avoid reuse 
object problems. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to