Copilot commented on code in PR #2995:
URL: https://github.com/apache/fluss/pull/2995#discussion_r3032874826
##########
fluss-common/src/main/java/org/apache/fluss/row/ProjectedRow.java:
##########
@@ -190,10 +189,15 @@ public static ProjectedRow from(int[] projection) {
}
public static ProjectedRow from(Schema originSchema, Schema
expectedSchema) {
- int[] indexMapping = getIndexMapping(originSchema, expectedSchema);
+ int[] indexMapping = SchemaUtil.getIndexMapping(originSchema,
expectedSchema);
return new ProjectedRow(indexMapping);
}
+ /** Returns the index mapping used by this projected row. */
+ public int[] getIndexMapping() {
+ return indexMapping;
Review Comment:
`getIndexMapping()` returns the internal `indexMapping` array directly.
Since `ProjectedRow` is `@PublicEvolving`, external callers can mutate the
returned array and corrupt the projection behavior (and potentially break
thread-safety assumptions elsewhere).
Return a defensive copy (e.g., `Arrays.copyOf(...)`) or otherwise expose the
mapping in an immutable form.
```suggestion
return Arrays.copyOf(indexMapping, indexMapping.length);
```
##########
fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java:
##########
@@ -242,33 +260,79 @@ public FieldGetter[] getSelectedFieldGetters() {
return selectedFieldGetters;
}
+ /** Returns the selected row type after projection and reordering. */
+ public RowType getSelectedRowType() {
+ return selectedRowType;
+ }
+
/** Whether the projection is push downed to the server side and the
returned data is pruned. */
public boolean isProjectionPushDowned() {
return projectionPushDowned;
}
+ /**
+ * Returns the output Arrow column projection for the given schema id, or
{@code null} if no
+ * client-side projection is needed (i.e., the projection is identity).
+ *
+ * <p>The returned indexes describe which columns from the read batch
should be exposed in the
+ * final output order. A value of {@code -1} means the output column does
not exist in the read
+ * schema and should be filled with nulls.
+ */
+ @Nullable
+ public int[] getArrowColumnProjection(int schemaId) {
+ int[] projection;
+ ProjectedRow projectedRow = getOutputProjectedRow(schemaId);
+ if (projectedRow == null) {
+ projection = selectedFields;
+ } else {
+ int[] indexMapping = projectedRow.getIndexMapping();
+ projection = new int[selectedFields.length];
+ for (int i = 0; i < selectedFields.length; i++) {
+ projection[i] = indexMapping[selectedFields[i]];
+ }
+ }
+ return isIdentityProjection(projection)
+ ? null
+ : Arrays.copyOf(projection, projection.length);
+ }
+
+ private static boolean isIdentityProjection(int[] projection) {
+ for (int i = 0; i < projection.length; i++) {
+ if (projection[i] != i) {
+ return false;
+ }
+ }
+ return true;
+ }
Review Comment:
`getArrowColumnProjection()` can incorrectly return `null` (treating the
projection as identity) when the client has selected a strict prefix/subset
like `[0, 1]` but the underlying read batch schema has more columns (e.g.,
remote read without server-side projection pushdown). In that case
`DefaultLogRecordBatch.loadArrowBatch()` will skip projection and return a
`VectorSchemaRoot` with extra columns, breaking the ArrowLogScanner contract.
Consider making the “identity” check depend on the *read schema* field count
(e.g., only identity if `projection.length ==
getRowType(schemaId).getFieldCount()` and `projection[i] == i` for all i), so
subset projections never collapse to `null`.
##########
fluss-common/pom.xml:
##########
@@ -62,6 +62,20 @@
<artifactId>fluss-shaded-arrow</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${arrow.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ <version>${arrow.version}</version>
+ <scope>provided</scope>
Review Comment:
`arrow-vector` / `arrow-memory-netty` are added with scope `provided`, but
`fluss-common` now contains and exposes public classes/method signatures that
depend on `org.apache.arrow.*` (e.g., `ArrowBatchData`,
`LogRecordBatch.loadArrowBatch`). If unshaded Arrow is not on the runtime
classpath, consumers may hit `NoClassDefFoundError`.
Consider using compile/runtime scope (or making the Arrow-dependent API
isolated behind optional modules) so that runtime behavior matches the new
public API surface.
```suggestion
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>${arrow.version}</version>
```
##########
fluss-client/pom.xml:
##########
@@ -49,6 +49,20 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${arrow.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ <version>${arrow.version}</version>
+ <scope>provided</scope>
Review Comment:
Both `fluss-common` and `fluss-client` add `org.apache.arrow:arrow-vector` /
`arrow-memory-netty` with scope `provided`, but the new public API types
(`ArrowBatchData`, `ArrowLogScanner`, and `LogRecordBatch.loadArrowBatch(...)`)
reference `org.apache.arrow.*` classes directly. If these Arrow artifacts are
not present at runtime, class loading (or first use) can fail with
`NoClassDefFoundError`.
Please ensure the runtime classpath will include unshaded Arrow (e.g., use
compile/runtime scope or clearly document/enforce that applications must add
these dependencies when using Arrow scanning).
```suggestion
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>${arrow.version}</version>
```
##########
fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java:
##########
@@ -262,6 +264,79 @@ public CloseableIterator<LogRecord> records(ReadContext
context) {
}
}
+ @Override
+ public ArrowBatchData loadArrowBatch(ReadContext context) {
+ if (context.getLogFormat() != LogFormat.ARROW) {
+ throw new UnsupportedOperationException(
+ "loadArrowBatch is only supported for ARROW log format.");
+ }
+
+ int schemaId = schemaId();
+ RowType rowType = context.getRowType(schemaId);
+ org.apache.arrow.memory.BufferAllocator allocator =
+ new org.apache.arrow.memory.RootAllocator(Long.MAX_VALUE);
+ org.apache.arrow.vector.VectorSchemaRoot readRoot =
+ org.apache.arrow.vector.VectorSchemaRoot.create(
+ UnshadedArrowReadUtils.toArrowSchema(rowType),
allocator);
Review Comment:
`loadArrowBatch()` creates a brand new `RootAllocator(Long.MAX_VALUE)` for
every batch. This is expensive in hot scan loops and can also make it hard to
enforce/monitor Arrow memory usage.
Consider reusing an allocator across batches (e.g., create a shared root
allocator per scanner/read-context and then use child allocators per batch that
`ArrowBatchData.close()` can safely close), instead of allocating a new root
allocator for each batch.
##########
fluss-common/src/main/java/org/apache/fluss/record/UnshadedFlussVectorLoader.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.record;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.util.Collections2;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.TypeLayout;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.arrow.vector.compression.CompressionUtil.CodecType;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/** Unshaded variant of {@link FlussVectorLoader} for scanner/read path. */
+public class UnshadedFlussVectorLoader {
+ private final VectorSchemaRoot root;
+ private final CompressionCodec.Factory factory;
+ private boolean decompressionNeeded;
+
+ public UnshadedFlussVectorLoader(VectorSchemaRoot root,
CompressionCodec.Factory factory) {
+ this.root = root;
+ this.factory = factory;
+ }
+
+ public void load(ArrowRecordBatch recordBatch) {
+ Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator();
+ Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator();
+ CompressionUtil.CodecType codecType =
+
CodecType.fromCompressionType(recordBatch.getBodyCompression().getCodec());
+ this.decompressionNeeded = codecType != CodecType.NO_COMPRESSION;
+ CompressionCodec codec =
+ this.decompressionNeeded
+ ? this.factory.createCodec(codecType)
+ : NoCompressionCodec.INSTANCE;
+
+ for (FieldVector fieldVector : this.root.getFieldVectors()) {
+ this.loadBuffers(fieldVector, fieldVector.getField(), buffers,
nodes, codec);
+ }
+
+ this.root.setRowCount(recordBatch.getLength());
+ if (nodes.hasNext() || buffers.hasNext()) {
+ throw new IllegalArgumentException(
+ "not all nodes and buffers were consumed. nodes: "
+ + Collections2.toString(nodes)
+ + " buffers: "
+ + Collections2.toString(buffers));
+ }
+ }
+
+ private void loadBuffers(
+ FieldVector vector,
+ Field field,
+ Iterator<ArrowBuf> buffers,
+ Iterator<ArrowFieldNode> nodes,
+ CompressionCodec codec) {
+ Preconditions.checkArgument(
+ nodes.hasNext(), "no more field nodes for field %s and vector
%s", field, vector);
+ ArrowFieldNode fieldNode = nodes.next();
+ int bufferLayoutCount = TypeLayout.getTypeBufferCount(field.getType());
+ List<ArrowBuf> ownBuffers = new ArrayList<>(bufferLayoutCount);
+
+ try {
+ for (int j = 0; j < bufferLayoutCount; ++j) {
+ ArrowBuf nextBuf = buffers.next();
+ ArrowBuf bufferToAdd =
+ nextBuf.writerIndex() > 0L
+ ? codec.decompress(vector.getAllocator(),
nextBuf)
+ : nextBuf;
+ ownBuffers.add(bufferToAdd);
+ if (this.decompressionNeeded) {
Review Comment:
In the compressed case, `nextBuf.getReferenceManager().retain()` is called
unconditionally when `decompressionNeeded` is true. For buffers that are
actually decompressed (`bufferToAdd != nextBuf`), this extra retain is never
balanced by a corresponding release, so closing the `ArrowRecordBatch` will
leave the original compressed buffers with a positive ref count (direct-memory
leak).
Retain the original buffer only when it is reused as the loaded buffer
(e.g., when `bufferToAdd == nextBuf`, such as for zero-length buffers), or
explicitly release the retained `nextBuf` after loading.
```suggestion
if (this.decompressionNeeded && bufferToAdd == nextBuf) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]