wuchong commented on code in PR #2040:
URL: https://github.com/apache/fluss/pull/2040#discussion_r2573470645
##########
fluss-common/src/main/java/org/apache/fluss/row/decode/CompactedRowDecoder.java:
##########
@@ -40,8 +40,12 @@ public CompactedRow decode(byte[] values) {
@Override
public CompactedRow decode(MemorySegment segment, int offset, int
sizeInBytes) {
+ // Copy the data to avoid sharing the underlying MemorySegment
+ byte[] rowBytes = new byte[sizeInBytes];
+ segment.get(offset, rowBytes, 0, sizeInBytes);
+
CompactedRow compactedRow = new CompactedRow(fieldDataTypes.length,
deserializer);
- compactedRow.pointTo(segment, offset, sizeInBytes);
+ compactedRow.pointTo(MemorySegment.wrap(rowBytes), 0, sizeInBytes);
Review Comment:
We should avoid deep copy, this introduces performance regression.
##########
fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java:
##########
@@ -223,7 +223,10 @@ public CloseableIterator<LogRecord> records(ReadContext
context) {
rowType,
context.getVectorSchemaRoot(schemaId),
context.getBufferAllocator(),
- timestamp);
+ timestamp,
+ context instanceof LogRecordReadContext
+ ? (LogRecordReadContext) context
+ : null);
Review Comment:
This force casting is hack and error-prone in the future. Because this may
break if we introduce another ReadContext implementation, and here will use
`null`.
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkArrayTypeITCase.java:
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration tests for Array type support in Flink connector. */
+abstract class FlinkArrayTypeITCase extends AbstractTestBase {
Review Comment:
IT case is very heavy, we should avoid add too many tests just for one
purpose. I suggest to do the following updates:
1. Rename `FlinkArrayTypeITCase` to `FlinkComplexTypeITCase` to cover future
Map and Row types.
2. Add a test for LOG TABLE that covers all array types (ARRAY<not null
primitive types>, ARRAY<nullable element types>, several ARRAY nested types).
Writing reocords (+ null elements if the element type is nullable) into the
table, and read from the table.
3. Add a test for PK TABLE that is the same with above types. Also writing
and reading from the table. But we should test updating and deleting as well.
Take
`org.apache.fluss.flink.source.FlinkTableSourceITCase#testReadKvTableWithScanStartupModeEqualsFull`
as an example about how to verify reading kv table for both snapshot read and
incremental read. Then test lookup join the table.
4. Add exception test that Array type can't be as primary key, or bucket
key, or partition key. Add tests to verify this.
##########
fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java:
##########
@@ -159,28 +159,94 @@ public static ArrowReader createArrowReader(
MemorySegment segment,
int arrowOffset,
int arrowLength,
- VectorSchemaRoot schemaRoot,
+ VectorSchemaRoot sharedSchemaRoot,
BufferAllocator allocator,
- RowType rowType) {
+ RowType rowType,
+ org.apache.fluss.record.LogRecordReadContext readContext) {
ByteBuffer arrowBatchBuffer = segment.wrap(arrowOffset, arrowLength);
+
try (ReadChannel channel =
new ReadChannel(new
ByteBufferReadableChannel(arrowBatchBuffer));
ArrowRecordBatch batch = deserializeRecordBatch(channel,
allocator)) {
+
+ // Create a new VectorSchemaRoot for each batch to avoid data
contamination
+ // when the shared root is reused for subsequent batches
+ VectorSchemaRoot batchSchemaRoot =
+ VectorSchemaRoot.create(sharedSchemaRoot.getSchema(),
allocator);
+
VectorLoader vectorLoader =
- new VectorLoader(schemaRoot,
ArrowCompressionFactory.INSTANCE);
+ new VectorLoader(batchSchemaRoot,
ArrowCompressionFactory.INSTANCE);
vectorLoader.load(batch);
+
+ // Fix buffer writerIndex after VectorLoader.load
+ // VectorLoader sets capacity but not writerIndex for buffers
+ fixVectorBuffers(batchSchemaRoot);
+
List<ColumnVector> columnVectors = new ArrayList<>();
- List<FieldVector> fieldVectors = schemaRoot.getFieldVectors();
+ List<FieldVector> fieldVectors = batchSchemaRoot.getFieldVectors();
for (int i = 0; i < fieldVectors.size(); i++) {
columnVectors.add(
createArrowColumnVector(fieldVectors.get(i),
rowType.getTypeAt(i)));
}
- return new ArrowReader(schemaRoot, columnVectors.toArray(new
ColumnVector[0]));
+
+ // Register the batch root with the read context so it can be
closed later
+ if (readContext != null) {
+ readContext.registerBatchRoot(batchSchemaRoot);
+ }
+
+ return new ArrowReader(batchSchemaRoot, columnVectors.toArray(new
ColumnVector[0]));
} catch (IOException e) {
throw new RuntimeException("Failed to deserialize
ArrowRecordBatch.", e);
}
}
+ /**
+ * Fixes writerIndex for all buffers in all vectors after
VectorLoader.load().
+ * VectorLoader.load() sets the capacity but not the writerIndex for
buffers.
+ */
+ private static void fixVectorBuffers(VectorSchemaRoot schemaRoot) {
Review Comment:
Why do we need this for array type? Why we don't need this before?
##########
fluss-common/src/main/java/org/apache/fluss/row/encode/CompactedRowEncoder.java:
##########
@@ -60,8 +60,12 @@ public void encodeField(int pos, Object value) {
@Override
public CompactedRow finishRow() {
+ int rowSize = writer.position();
+ byte[] rowBytes = new byte[rowSize];
+ writer.segment().get(0, rowBytes, 0, rowSize);
+
CompactedRow row = new CompactedRow(fieldDataTypes.length,
compactedRowDeserializer);
- row.pointTo(writer.segment(), 0, writer.position());
+ row.pointTo(org.apache.fluss.memory.MemorySegment.wrap(rowBytes), 0,
rowSize);
Review Comment:
We should avoid deep copy, this introduces performance regression.
##########
fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java:
##########
@@ -164,6 +168,7 @@ private LogRecordReadContext(
this.bufferAllocator = bufferAllocator;
this.selectedFieldGetters = selectedFieldGetters;
this.projectionPushDowned = projectionPushDowned;
+ this.batchRoots = Collections.synchronizedList(new ArrayList<>());
Review Comment:
`synchronizedList` is performant bad, please avoid to use this.
##########
fluss-common/src/main/java/org/apache/fluss/row/serializer/ArraySerializer.java:
##########
@@ -113,7 +113,7 @@ public BinaryArray toAlignedArray(InternalArray from) {
}
reuseWriter.complete();
- return reuseArray;
+ return reuseArray.copy();
Review Comment:
We should avoid `copy()`, this introduces performance regression.
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkArrayTypeITCase.java:
##########
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration tests for Array type support in Flink connector. */
+abstract class FlinkArrayTypeITCase extends AbstractTestBase {
+
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder().setNumOfTabletServers(3).build();
+
+ static final String CATALOG_NAME = "testcatalog";
+ static final String DEFAULT_DB = "defaultdb";
+
+ protected StreamExecutionEnvironment env;
+ protected StreamTableEnvironment tEnv;
+ protected TableEnvironment tBatchEnv;
+
+ @BeforeEach
+ void before() {
+ String bootstrapServers =
FLUSS_CLUSTER_EXTENSION.getBootstrapServers();
+
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+
+ tEnv = StreamTableEnvironment.create(env);
+ tEnv.executeSql(
+ String.format(
+ "create catalog %s with ('type' = 'fluss', '%s' =
'%s')",
+ CATALOG_NAME, BOOTSTRAP_SERVERS.key(),
bootstrapServers));
+ tEnv.executeSql("use catalog " + CATALOG_NAME);
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
+
+ tBatchEnv =
+
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ tBatchEnv.executeSql(
+ String.format(
+ "create catalog %s with ('type' = 'fluss', '%s' =
'%s')",
+ CATALOG_NAME, BOOTSTRAP_SERVERS.key(),
bootstrapServers));
+ tBatchEnv.executeSql("use catalog " + CATALOG_NAME);
+ tBatchEnv
+ .getConfig()
+
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+
+ tEnv.executeSql("create database " + DEFAULT_DB);
+ tEnv.useDatabase(DEFAULT_DB);
+ tBatchEnv.useDatabase(DEFAULT_DB);
+ }
+
+ @AfterEach
+ void after() {
+ tEnv.useDatabase(BUILTIN_DATABASE);
+ tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
+ }
+
+ @Test
+ void testSimpleLogTableWithSinkAPI() throws Exception {
Review Comment:
This test is not about array type and is not needed.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/serializer/RowDataSerializationSchema.java:
##########
@@ -64,12 +73,25 @@ public RowDataSerializationSchema(boolean isAppendOnly,
boolean ignoreDelete) {
@Override
public void open(InitializationContext context) throws Exception {
this.converter = new FlinkAsFlussRow();
+ // For primary key tables (non-append-only), we need to encode the row
immediately
+ // to avoid issues with Flink reusing RowData objects
Review Comment:
I don't understand this, if it's for primary key tables, then `UpsertWriter`
will materialize/encode the row into binary format which already avoid reuse
object problems.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]