This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-java.git
The following commit(s) were added to refs/heads/main by this push:
new c6f608e86 GH-765: Do not close/free imported BaseStruct objects (#766)
c6f608e86 is described below
commit c6f608e863d3cefdfc41a433ed854e43b20a0925
Author: Pepijn Van Eeckhoudt <[email protected]>
AuthorDate: Wed May 28 03:06:02 2025 +0200
GH-765: Do not close/free imported BaseStruct objects (#766)
## What's Changed
This PR removes the direct and indirect calls to `BaseStruct#close` from
`org.apache.arrow.c.Data`. By not eagerly closing/freeing these objects
callers can reuse instances multiple times.
Closes #765.
---
.../java/org/apache/arrow/c/ArrayImporter.java | 1 -
.../org/apache/arrow/c/ArrowArrayStreamReader.java | 1 -
c/src/main/java/org/apache/arrow/c/Data.java | 219 +++++++++++++++++++--
.../java/org/apache/arrow/c/RoundtripTest.java | 106 ++++++++--
4 files changed, 287 insertions(+), 40 deletions(-)
diff --git a/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
b/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
index b74fb1b47..f31a8a1fa 100644
--- a/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
+++ b/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
@@ -58,7 +58,6 @@ final class ArrayImporter {
ArrowArray ownedArray = ArrowArray.allocateNew(allocator);
ownedArray.save(snapshot);
src.markReleased();
- src.close();
recursionLevel = 0;
diff --git a/c/src/main/java/org/apache/arrow/c/ArrowArrayStreamReader.java
b/c/src/main/java/org/apache/arrow/c/ArrowArrayStreamReader.java
index 07a88cd8d..34a9c4ec0 100644
--- a/c/src/main/java/org/apache/arrow/c/ArrowArrayStreamReader.java
+++ b/c/src/main/java/org/apache/arrow/c/ArrowArrayStreamReader.java
@@ -44,7 +44,6 @@ final class ArrowArrayStreamReader extends ArrowReader {
this.ownedStream = ArrowArrayStream.allocateNew(allocator);
this.ownedStream.save(snapshot);
stream.markReleased();
- stream.close();
}
@Override
diff --git a/c/src/main/java/org/apache/arrow/c/Data.java
b/c/src/main/java/org/apache/arrow/c/Data.java
index 0b4da33b4..f9d2ee454 100644
--- a/c/src/main/java/org/apache/arrow/c/Data.java
+++ b/c/src/main/java/org/apache/arrow/c/Data.java
@@ -231,6 +231,22 @@ public final class Data {
new ArrayStreamExporter(allocator).export(out, reader);
}
+ /**
+ * Equivalent to calling {@link #importField(BufferAllocator, ArrowSchema,
+ * CDataDictionaryProvider, boolean) importField(allocator, schema,
provider, true)}.
+ *
+ * @param allocator Buffer allocator for allocating dictionary vectors
+ * @param schema C data interface struct representing the field [inout]
+ * @param provider A dictionary provider will be initialized with empty
dictionary vectors
+ * (optional)
+ * @return Imported field object
+ * @see #importField(BufferAllocator, ArrowSchema, CDataDictionaryProvider,
boolean)
+ */
+ public static Field importField(
+ BufferAllocator allocator, ArrowSchema schema, CDataDictionaryProvider
provider) {
+ return importField(allocator, schema, provider, true);
+ }
+
/**
* Import Java Field from the C data interface.
*
@@ -241,19 +257,42 @@ public final class Data {
* @param schema C data interface struct representing the field [inout]
* @param provider A dictionary provider will be initialized with empty
dictionary vectors
* (optional)
+ * @param closeImportedStructs if true, the ArrowSchema struct will be
closed when this method
+ * completes.
* @return Imported field object
*/
public static Field importField(
- BufferAllocator allocator, ArrowSchema schema, CDataDictionaryProvider
provider) {
+ BufferAllocator allocator,
+ ArrowSchema schema,
+ CDataDictionaryProvider provider,
+ boolean closeImportedStructs) {
try {
SchemaImporter importer = new SchemaImporter(allocator);
return importer.importField(schema, provider);
} finally {
schema.release();
- schema.close();
+ if (closeImportedStructs) {
+ schema.close();
+ }
}
}
+ /**
+ * Equivalent to calling {@link #importSchema(BufferAllocator, ArrowSchema,
+ * CDataDictionaryProvider, boolean) importSchema(allocator, schema,
provider, true)}.
+ *
+ * @param allocator Buffer allocator for allocating dictionary vectors
+ * @param schema C data interface struct representing the field
+ * @param provider A dictionary provider will be initialized with empty
dictionary vectors
+ * (optional)
+ * @return Imported schema object
+ * @see #importSchema(BufferAllocator, ArrowSchema, CDataDictionaryProvider,
boolean)
+ */
+ public static Schema importSchema(
+ BufferAllocator allocator, ArrowSchema schema, CDataDictionaryProvider
provider) {
+ return importSchema(allocator, schema, provider, true);
+ }
+
/**
* Import Java Schema from the C data interface.
*
@@ -264,11 +303,16 @@ public final class Data {
* @param schema C data interface struct representing the field
* @param provider A dictionary provider will be initialized with empty
dictionary vectors
* (optional)
+ * @param closeImportedStructs if true, the ArrowSchema struct will be
closed when this method
+ * completes.
* @return Imported schema object
*/
public static Schema importSchema(
- BufferAllocator allocator, ArrowSchema schema, CDataDictionaryProvider
provider) {
- Field structField = importField(allocator, schema, provider);
+ BufferAllocator allocator,
+ ArrowSchema schema,
+ CDataDictionaryProvider provider,
+ boolean closeImportedStructs) {
+ Field structField = importField(allocator, schema, provider,
closeImportedStructs);
if (structField.getType().getTypeID() != ArrowTypeID.Struct) {
throw new IllegalArgumentException(
"Cannot import schema: ArrowSchema describes non-struct type");
@@ -276,24 +320,67 @@ public final class Data {
return new Schema(structField.getChildren(), structField.getMetadata());
}
+ /**
+ * Equivalent to calling {@link #importIntoVector(BufferAllocator,
ArrowArray, FieldVector,
+ * DictionaryProvider, boolean)} importIntoVector(allocator, array, vector,
provider, true)}.
+ *
+ * @param allocator Buffer allocator
+ * @param array C data interface struct holding the array data
+ * @param vector Imported vector object [out]
+ * @param provider Dictionary provider to load dictionary vectors to
(optional)
+ * @see #importIntoVector(BufferAllocator, ArrowArray, FieldVector,
DictionaryProvider, boolean)
+ */
+ public static void importIntoVector(
+ BufferAllocator allocator,
+ ArrowArray array,
+ FieldVector vector,
+ DictionaryProvider provider) {
+ importIntoVector(allocator, array, vector, provider, true);
+ }
+
/**
* Import Java vector from the C data interface.
*
- * <p>The ArrowArray struct has its contents moved (as per the C data
interface specification) to
- * a private object held alive by the resulting array.
+ * <p>On successful completion, the ArrowArray struct will have been moved
(as per the C data
+ * interface specification) to a private object held alive by the resulting
array.
*
* @param allocator Buffer allocator
* @param array C data interface struct holding the array data
* @param vector Imported vector object [out]
* @param provider Dictionary provider to load dictionary vectors to
(optional)
+ * @param closeImportedStructs if true, the ArrowArray struct will be closed
when this method
+ * completes successfully.
*/
public static void importIntoVector(
BufferAllocator allocator,
ArrowArray array,
FieldVector vector,
- DictionaryProvider provider) {
+ DictionaryProvider provider,
+ boolean closeImportedStructs) {
ArrayImporter importer = new ArrayImporter(allocator, vector, provider);
importer.importArray(array);
+ if (closeImportedStructs) {
+ array.close();
+ }
+ }
+
+ /**
+ * Equivalent to calling {@link #importVector(BufferAllocator, ArrowArray,
ArrowSchema,
+ * CDataDictionaryProvider, boolean) importVector(allocator, array, schema,
provider, true)}.
+ *
+ * @param allocator Buffer allocator for allocating the output FieldVector
+ * @param array C data interface struct holding the array data
+ * @param schema C data interface struct holding the array type
+ * @param provider Dictionary provider to load dictionary vectors to
(optional)
+ * @return Imported vector object
+ * @see #importVector(BufferAllocator, ArrowArray, ArrowSchema,
CDataDictionaryProvider, boolean)
+ */
+ public static FieldVector importVector(
+ BufferAllocator allocator,
+ ArrowArray array,
+ ArrowSchema schema,
+ CDataDictionaryProvider provider) {
+ return importVector(allocator, array, schema, provider, true);
}
/**
@@ -307,19 +394,42 @@ public final class Data {
* @param array C data interface struct holding the array data
* @param schema C data interface struct holding the array type
* @param provider Dictionary provider to load dictionary vectors to
(optional)
+ * @param closeImportedStructs if true, the ArrowArray struct will be closed
when this method
+ * completes successfully and the ArrowSchema struct will be always be
closed.
* @return Imported vector object
*/
public static FieldVector importVector(
BufferAllocator allocator,
ArrowArray array,
ArrowSchema schema,
- CDataDictionaryProvider provider) {
- Field field = importField(allocator, schema, provider);
+ CDataDictionaryProvider provider,
+ boolean closeImportedStructs) {
+ Field field = importField(allocator, schema, provider,
closeImportedStructs);
FieldVector vector = field.createVector(allocator);
- importIntoVector(allocator, array, vector, provider);
+ importIntoVector(allocator, array, vector, provider, closeImportedStructs);
return vector;
}
+ /**
+ * Equivalent to calling {@link #importIntoVectorSchemaRoot(BufferAllocator,
ArrowArray,
+ * VectorSchemaRoot, DictionaryProvider, boolean)
importIntoVectorSchemaRoot(allocator, array,
+ * root, provider, true)}.
+ *
+ * @param allocator Buffer allocator
+ * @param array C data interface struct holding the record batch data
+ * @param root vector schema root to load into
+ * @param provider Dictionary provider to load dictionary vectors to
(optional)
+ * @see #importIntoVectorSchemaRoot(BufferAllocator, ArrowArray,
VectorSchemaRoot,
+ * DictionaryProvider, boolean)
+ */
+ public static void importIntoVectorSchemaRoot(
+ BufferAllocator allocator,
+ ArrowArray array,
+ VectorSchemaRoot root,
+ DictionaryProvider provider) {
+ importIntoVectorSchemaRoot(allocator, array, root, provider, true);
+ }
+
/**
* Import record batch from the C data interface into vector schema root.
*
@@ -333,15 +443,18 @@ public final class Data {
* @param array C data interface struct holding the record batch data
* @param root vector schema root to load into
* @param provider Dictionary provider to load dictionary vectors to
(optional)
+ * @param closeImportedStructs if true, the ArrowArray struct will be closed
when this method
+ * completes successfully
*/
public static void importIntoVectorSchemaRoot(
BufferAllocator allocator,
ArrowArray array,
VectorSchemaRoot root,
- DictionaryProvider provider) {
+ DictionaryProvider provider,
+ boolean closeImportedStructs) {
try (StructVector structVector = StructVector.emptyWithDuplicates("",
allocator)) {
structVector.initializeChildrenFromFields(root.getSchema().getFields());
- importIntoVector(allocator, array, structVector, provider);
+ importIntoVector(allocator, array, structVector, provider,
closeImportedStructs);
StructVectorUnloader unloader = new StructVectorUnloader(structVector);
VectorLoader loader = new VectorLoader(root);
try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) {
@@ -350,6 +463,21 @@ public final class Data {
}
}
+ /**
+ * Equivalent to calling {@link #importVectorSchemaRoot(BufferAllocator,
ArrowSchema,
+ * CDataDictionaryProvider, boolean) importVectorSchemaRoot(allocator,
schema, provider, true)}.
+ *
+ * @param allocator Buffer allocator for allocating the output
VectorSchemaRoot
+ * @param schema C data interface struct holding the record batch schema
+ * @param provider Dictionary provider to load dictionary vectors to
(optional)
+ * @return Imported vector schema root
+ * @see #importVectorSchemaRoot(BufferAllocator, ArrowSchema,
CDataDictionaryProvider, boolean)
+ */
+ public static VectorSchemaRoot importVectorSchemaRoot(
+ BufferAllocator allocator, ArrowSchema schema, CDataDictionaryProvider
provider) {
+ return importVectorSchemaRoot(allocator, schema, provider, true);
+ }
+
/**
* Import Java vector schema root from a C data interface Schema.
*
@@ -360,11 +488,37 @@ public final class Data {
* @param allocator Buffer allocator for allocating the output
VectorSchemaRoot
* @param schema C data interface struct holding the record batch schema
* @param provider Dictionary provider to load dictionary vectors to
(optional)
+ * @param closeImportedStructs if true, the ArrowSchema struct will be
closed when this method
+ * completes
* @return Imported vector schema root
*/
public static VectorSchemaRoot importVectorSchemaRoot(
- BufferAllocator allocator, ArrowSchema schema, CDataDictionaryProvider
provider) {
- return importVectorSchemaRoot(allocator, null, schema, provider);
+ BufferAllocator allocator,
+ ArrowSchema schema,
+ CDataDictionaryProvider provider,
+ boolean closeImportedStructs) {
+ return importVectorSchemaRoot(allocator, null, schema, provider,
closeImportedStructs);
+ }
+
+ /**
+ * Equivalent to calling {@link #importVectorSchemaRoot(BufferAllocator,
ArrowArray, ArrowSchema,
+ * CDataDictionaryProvider, boolean) importVectorSchemaRoot(allocator,
array, schema, provider,
+ * true)}.
+ *
+ * @param allocator Buffer allocator for allocating the output
VectorSchemaRoot
+ * @param array C data interface struct holding the record batch data
(optional)
+ * @param schema C data interface struct holding the record batch schema
+ * @param provider Dictionary provider to load dictionary vectors to
(optional)
+ * @return Imported vector schema root
+ * @see #importVectorSchemaRoot(BufferAllocator, ArrowArray, ArrowSchema,
CDataDictionaryProvider,
+ * boolean)
+ */
+ public static VectorSchemaRoot importVectorSchemaRoot(
+ BufferAllocator allocator,
+ ArrowArray array,
+ ArrowSchema schema,
+ CDataDictionaryProvider provider) {
+ return importVectorSchemaRoot(allocator, array, schema, provider, true);
}
/**
@@ -383,29 +537,56 @@ public final class Data {
* @param array C data interface struct holding the record batch data
(optional)
* @param schema C data interface struct holding the record batch schema
* @param provider Dictionary provider to load dictionary vectors to
(optional)
+ * @param closeImportedStructs if true, the ArrowArray struct will be closed
when this method
+ * completes successfully and the ArrowSchema struct will be always be
closed.
* @return Imported vector schema root
*/
public static VectorSchemaRoot importVectorSchemaRoot(
BufferAllocator allocator,
ArrowArray array,
ArrowSchema schema,
- CDataDictionaryProvider provider) {
+ CDataDictionaryProvider provider,
+ boolean closeImportedStructs) {
VectorSchemaRoot vsr =
- VectorSchemaRoot.create(importSchema(allocator, schema, provider),
allocator);
+ VectorSchemaRoot.create(
+ importSchema(allocator, schema, provider, closeImportedStructs),
allocator);
if (array != null) {
- importIntoVectorSchemaRoot(allocator, array, vsr, provider);
+ importIntoVectorSchemaRoot(allocator, array, vsr, provider,
closeImportedStructs);
}
return vsr;
}
/**
- * Import an ArrowArrayStream as an {@link ArrowReader}.
+ * Equivalent to calling {@link #importArrayStream(BufferAllocator,
ArrowArrayStream, boolean)
+ * importArrayStream(allocator, stream, true)}.
*
* @param allocator Buffer allocator for allocating the output data.
* @param stream C stream interface struct to import.
* @return Imported reader
+ * @see #importArrayStream(BufferAllocator, ArrowArrayStream, boolean)
*/
public static ArrowReader importArrayStream(BufferAllocator allocator,
ArrowArrayStream stream) {
- return new ArrowArrayStreamReader(allocator, stream);
+ return importArrayStream(allocator, stream, true);
+ }
+
+ /**
+ * Import an ArrowArrayStream as an {@link ArrowReader}.
+ *
+ * <p>On successful completion, the ArrowArrayStream struct will have been
moved (as per the C
+ * data interface specification) to a private object held alive by the
resulting ArrowReader.
+ *
+ * @param allocator Buffer allocator for allocating the output data.
+ * @param stream C stream interface struct to import.
+ * @param closeImportedStructs if true, the ArrowArrayStream struct will be
closed when this
+ * method completes successfully
+ * @return Imported reader
+ */
+ public static ArrowReader importArrayStream(
+ BufferAllocator allocator, ArrowArrayStream stream, boolean
closeImportedStructs) {
+ ArrowArrayStreamReader reader = new ArrowArrayStreamReader(allocator,
stream);
+ if (closeImportedStructs) {
+ stream.close();
+ }
+ return reader;
}
}
diff --git a/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
b/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
index 6d68449c0..010a30549 100644
--- a/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
+++ b/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
@@ -17,9 +17,7 @@
package org.apache.arrow.c;
import static
org.apache.arrow.vector.testing.ValueVectorDataPopulator.setVector;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.*;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -958,6 +956,50 @@ public class RoundtripTest {
@Test
public void testSchema() {
+ Schema schema = createSchema();
+ // Consumer allocates empty ArrowSchema
+ try (ArrowSchema consumerArrowSchema = ArrowSchema.allocateNew(allocator))
{
+ // Producer fills the schema with data
+ exportSchema(schema, consumerArrowSchema);
+
+ // Consumer imports schema
+ Schema importedSchema = Data.importSchema(allocator,
consumerArrowSchema, null);
+ assertEquals(schema.toJson(), importedSchema.toJson());
+ }
+ }
+
+ @Test
+ public void testSchemaStructReuse() {
+ Schema schema = createSchema();
+ // Consumer allocates empty ArrowSchema
+ try (ArrowSchema consumerArrowSchema = ArrowSchema.allocateNew(allocator))
{
+ // Producer fills the schema with data
+ exportSchema(schema, consumerArrowSchema);
+
+ // Consumer imports schema
+ Schema importedSchema = Data.importSchema(allocator,
consumerArrowSchema, null, false);
+ assertEquals(schema.toJson(), importedSchema.toJson());
+
+ // Imported struct should be released but not closed
+ assertEquals(0, consumerArrowSchema.snapshot().release);
+ assertNotEquals(0, consumerArrowSchema.memoryAddress());
+
+ // Export and import again
+ exportSchema(schema, consumerArrowSchema);
+ importedSchema = Data.importSchema(allocator, consumerArrowSchema, null,
false);
+ assertEquals(schema.toJson(), importedSchema.toJson());
+ assertEquals(0, consumerArrowSchema.snapshot().release);
+ assertNotEquals(0, consumerArrowSchema.memoryAddress());
+ }
+ }
+
+ private void exportSchema(Schema schema, ArrowSchema targetArrowSchema) {
+ try (ArrowSchema arrowSchema =
ArrowSchema.wrap(targetArrowSchema.memoryAddress())) {
+ Data.exportSchema(allocator, schema, null, arrowSchema);
+ }
+ }
+
+ private static Schema createSchema() {
Field decimalField =
new Field("inner1", FieldType.nullable(new ArrowType.Decimal(19, 4,
128)), null);
Field strField = new Field("inner2", FieldType.nullable(new
ArrowType.Utf8()), null);
@@ -968,16 +1010,7 @@ public class RoundtripTest {
Arrays.asList(decimalField, strField));
Field intField = new Field("col2", FieldType.nullable(new
ArrowType.Int(32, true)), null);
Schema schema = new Schema(Arrays.asList(itemField, intField));
- // Consumer allocates empty ArrowSchema
- try (ArrowSchema consumerArrowSchema = ArrowSchema.allocateNew(allocator))
{
- // Producer fills the schema with data
- try (ArrowSchema arrowSchema =
ArrowSchema.wrap(consumerArrowSchema.memoryAddress())) {
- Data.exportSchema(allocator, schema, null, arrowSchema);
- }
- // Consumer imports schema
- Schema importedSchema = Data.importSchema(allocator,
consumerArrowSchema, null);
- assertEquals(schema.toJson(), importedSchema.toJson());
- }
+ return schema;
}
@Test
@@ -1002,12 +1035,8 @@ public class RoundtripTest {
try (ArrowSchema consumerArrowSchema = ArrowSchema.allocateNew(allocator);
ArrowArray consumerArrowArray = ArrowArray.allocateNew(allocator)) {
// Producer creates structures from existing memory pointers
- try (ArrowSchema arrowSchema =
ArrowSchema.wrap(consumerArrowSchema.memoryAddress());
- ArrowArray arrowArray =
ArrowArray.wrap(consumerArrowArray.memoryAddress())) {
- // Producer exports vector into the C Data Interface structures
- try (final NullVector vector = new NullVector()) {
- Data.exportVector(allocator, vector, null, arrowArray, arrowSchema);
- }
+ try (final NullVector vector = new NullVector()) {
+ exportFieldVector(vector, consumerArrowSchema, consumerArrowArray);
}
// Release array structure
@@ -1025,6 +1054,45 @@ public class RoundtripTest {
}
}
+ @Test
+ public void testArrayStructReuse() {
+ // Consumer allocates empty structures
+ try (ArrowSchema consumerArrowSchema = ArrowSchema.allocateNew(allocator);
+ ArrowArray consumerArrowArray = ArrowArray.allocateNew(allocator)) {
+ // Producer creates structures from existing memory pointers
+ try (final NullVector vector = new NullVector()) {
+ exportFieldVector(vector, consumerArrowSchema, consumerArrowArray);
+ }
+ Data.importVector(allocator, consumerArrowArray, consumerArrowSchema,
null, false);
+
+ // Imported structs should be released but not closed
+ assertEquals(0, consumerArrowSchema.snapshot().release);
+ assertNotEquals(0, consumerArrowSchema.memoryAddress());
+ assertEquals(0, consumerArrowArray.snapshot().release);
+ assertNotEquals(0, consumerArrowArray.memoryAddress());
+
+ try (final NullVector vector = new NullVector()) {
+ exportFieldVector(vector, consumerArrowSchema, consumerArrowArray);
+ }
+ Data.importVector(allocator, consumerArrowArray, consumerArrowSchema,
null, false);
+
+ // Imported structs should be released but not closed
+ assertEquals(0, consumerArrowSchema.snapshot().release);
+ assertNotEquals(0, consumerArrowSchema.memoryAddress());
+ assertEquals(0, consumerArrowArray.snapshot().release);
+ assertNotEquals(0, consumerArrowArray.memoryAddress());
+ }
+ }
+
+ private void exportFieldVector(
+ FieldVector vector, ArrowSchema consumerArrowSchema, ArrowArray
consumerArrowArray) {
+ try (ArrowSchema arrowSchema =
ArrowSchema.wrap(consumerArrowSchema.memoryAddress());
+ ArrowArray arrowArray =
ArrowArray.wrap(consumerArrowArray.memoryAddress())) {
+ // Producer exports vector into the C Data Interface structures
+ Data.exportVector(allocator, vector, null, arrowArray, arrowSchema);
+ }
+ }
+
private VectorSchemaRoot createTestVSR() {
BitVector bitVector = new BitVector("boolean", allocator);