This is an automated email from the ASF dual-hosted git repository.
cancai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/main by this push:
new e5192ec21d [CALCITE-6298] Support UNION in Arrow adapter
e5192ec21d is described below
commit e5192ec21d4b076e6ffc269fec3895ce9d8e59d7
Author: Cancai Cai <[email protected]>
AuthorDate: Sun Apr 5 14:45:03 2026 +0800
[CALCITE-6298] Support UNION in Arrow adapter
---
.../calcite/adapter/arrow/ArrowEnumerable.java | 12 +++--
.../adapter/arrow/ArrowFilterEnumerator.java | 8 ++-
.../adapter/arrow/ArrowProjectEnumerator.java | 6 ++-
.../apache/calcite/adapter/arrow/ArrowSchema.java | 21 ++++----
.../apache/calcite/adapter/arrow/ArrowTable.java | 60 ++++++++++++++++++----
.../calcite/adapter/arrow/ArrowAdapterTest.java | 60 +++++++++++++++++++++-
6 files changed, 139 insertions(+), 28 deletions(-)
diff --git
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java
index 142f18c2f6..516822567e 100644
--- a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java
@@ -35,22 +35,26 @@ class ArrowEnumerable extends AbstractEnumerable<Object> {
private final ImmutableIntList fields;
private final @Nullable Projector projector;
private final @Nullable Filter filter;
-
+ private final Runnable onClose;
ArrowEnumerable(ArrowFileReader arrowFileReader, ImmutableIntList fields,
- @Nullable Projector projector, @Nullable Filter filter) {
+ @Nullable Projector projector, @Nullable Filter filter,
+ Runnable onClose) {
this.arrowFileReader = arrowFileReader;
this.projector = projector;
this.filter = filter;
this.fields = fields;
+ this.onClose = onClose;
}
@Override public Enumerator<Object> enumerator() {
try {
if (projector != null) {
- return new ArrowProjectEnumerator(arrowFileReader, fields, projector);
+ return new ArrowProjectEnumerator(arrowFileReader, fields, projector,
+ onClose);
} else if (filter != null) {
- return new ArrowFilterEnumerator(arrowFileReader, fields, filter);
+ return new ArrowFilterEnumerator(arrowFileReader, fields, filter,
+ onClose);
}
throw new IllegalArgumentException(
"The arrow enumerator must have either a filter or a projection");
diff --git
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java
index e54a8c6ed0..5eddec2249 100644
---
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java
+++
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java
@@ -45,10 +45,14 @@ class ArrowFilterEnumerator extends AbstractArrowEnumerator
{
private @Nullable SelectionVector selectionVector;
private int selectionVectorIndex;
- ArrowFilterEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList
fields, Filter filter) {
+ private final Runnable onClose;
+
+ ArrowFilterEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList
fields,
+ Filter filter, Runnable onClose) {
super(arrowFileReader, fields);
this.allocator = new RootAllocator(Long.MAX_VALUE);
this.filter = filter;
+ this.onClose = onClose;
}
@Override void evaluateOperator(ArrowRecordBatch arrowRecordBatch) {
@@ -98,6 +102,8 @@ class ArrowFilterEnumerator extends AbstractArrowEnumerator {
filter.close();
} catch (GandivaException e) {
throw Util.toUnchecked(e);
+ } finally {
+ onClose.run();
}
}
}
diff --git
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java
index 2426810bbc..0895f36cf1 100644
---
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java
+++
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java
@@ -31,11 +31,13 @@
*/
class ArrowProjectEnumerator extends AbstractArrowEnumerator {
private final Projector projector;
+ private final Runnable onClose;
ArrowProjectEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList
fields,
- Projector projector) {
+ Projector projector, Runnable onClose) {
super(arrowFileReader, fields);
this.projector = projector;
+ this.onClose = onClose;
}
@Override protected void evaluateOperator(ArrowRecordBatch arrowRecordBatch)
{
@@ -71,6 +73,8 @@ class ArrowProjectEnumerator extends AbstractArrowEnumerator {
projector.close();
} catch (GandivaException e) {
throw Util.toUnchecked(e);
+ } finally {
+ onClose.run();
}
}
}
diff --git
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java
index c40cf4439a..510adfb842 100644
--- a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java
@@ -24,6 +24,7 @@
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.SeekableReadChannel;
+import org.apache.arrow.vector.types.pojo.Schema;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
@@ -34,7 +35,7 @@
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
@@ -96,21 +97,19 @@ private static Map<String, Table> deduceTableMap(File
baseDirectory) {
final Map<String, Table> tables = new HashMap<>();
for (File file : files) {
final File arrowFile = new File(Sources.of(file).path());
- final FileInputStream fileInputStream;
- try {
- fileInputStream = new FileInputStream(arrowFile);
- } catch (FileNotFoundException e) {
+ final Schema arrowSchema;
+ try (FileInputStream fis = new FileInputStream(arrowFile);
+ ArrowFileReader reader =
+ new ArrowFileReader(new SeekableReadChannel(fis.getChannel()),
+ new RootAllocator())) {
+ arrowSchema = reader.getVectorSchemaRoot().getSchema();
+ } catch (IOException e) {
throw Util.toUnchecked(e);
}
- final SeekableReadChannel seekableReadChannel =
- new SeekableReadChannel(fileInputStream.getChannel());
- final RootAllocator allocator = new RootAllocator();
- final ArrowFileReader arrowFileReader =
- new ArrowFileReader(seekableReadChannel, allocator);
final String tableName =
trim(file.getName(), ".arrow").toUpperCase(Locale.ROOT);
final ArrowTable table =
- new ArrowTable(null, arrowFileReader);
+ new ArrowTable(null, arrowFile, arrowSchema);
tables.put(tableName, table);
}
diff --git
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java
index 358a08fb25..fa8d59389b 100644
--- a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java
@@ -43,13 +43,17 @@
import org.apache.arrow.gandiva.expression.ExpressionTree;
import org.apache.arrow.gandiva.expression.TreeBuilder;
import org.apache.arrow.gandiva.expression.TreeNode;
+import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.SeekableReadChannel;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.checkerframework.checker.nullness.qual.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
@@ -62,23 +66,34 @@
import static java.util.Objects.requireNonNull;
/**
- * Arrow Table.
+ * Table backed by an Apache Arrow file.
+ *
+ * <p>Reads data from an Arrow IPC file on disk and supports projection
+ * and filter push-down via the Gandiva expression compiler.
+ *
+ * <p>Implements {@link TranslatableTable} so that it can be converted into
+ * an {@link ArrowTableScan} for query planning, and {@link QueryableTable}
+ * so that it can be used via the {@link org.apache.calcite.linq4j} API.
*/
public class ArrowTable extends AbstractTable
implements TranslatableTable, QueryableTable {
private final @Nullable RelProtoDataType protoRowType;
/** Arrow schema. (In Calcite terminology, more like a row type than a
Schema.) */
private final Schema schema;
- private final ArrowFileReader arrowFileReader;
+ private final File arrowFile;
- ArrowTable(@Nullable RelProtoDataType protoRowType, ArrowFileReader
arrowFileReader) {
- try {
- this.schema = arrowFileReader.getVectorSchemaRoot().getSchema();
- } catch (IOException e) {
- throw Util.toUnchecked(e);
- }
+ /** Creates an ArrowTable.
+ *
+ * @param protoRowType Optional row type override; if null, the row type is
+ * deduced from the Arrow schema
+ * @param arrowFile Arrow IPC file on disk
+ * @param schema Arrow schema of the file
+ */
+ ArrowTable(@Nullable RelProtoDataType protoRowType, File arrowFile,
+ Schema schema) {
this.protoRowType = protoRowType;
- this.arrowFileReader = arrowFileReader;
+ this.arrowFile = arrowFile;
+ this.schema = schema;
}
@Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {
@@ -148,7 +163,23 @@ public Enumerable<Object> query(DataContext root,
ImmutableIntList fields,
}
}
- return new ArrowEnumerable(arrowFileReader, fields, projector, filter);
+ FileInputStream fis = null;
+ try {
+ fis = new FileInputStream(arrowFile);
+ final ArrowFileReader reader =
+ new ArrowFileReader(new SeekableReadChannel(fis.getChannel()),
+ new RootAllocator());
+ final FileInputStream fisRef = fis;
+ final Runnable onClose = () -> closeSilently(fisRef);
+ fis = null; // ownership transferred to onClose
+ return new ArrowEnumerable(reader, fields, projector, filter, onClose);
+ } catch (IOException e) {
+ throw Util.toUnchecked(e);
+ } finally {
+ if (fis != null) {
+ closeSilently(fis);
+ }
+ }
}
@Override public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
@@ -200,6 +231,15 @@ private TreeNode convertConditionToGandiva(ConditionToken
token) {
token.operator, treeNodes, new ArrowType.Bool());
}
+ /** Closes an {@link AutoCloseable} without throwing. */
+ private static void closeSilently(AutoCloseable closeable) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+
private static TreeNode makeLiteralNode(String literal, String type) {
if (type.startsWith("decimal")) {
String[] typeParts =
diff --git
a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
index 67b3075b4e..e8d09d6bd9 100644
--- a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
+++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
@@ -536,7 +536,9 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
.explainContains(plan);
}
- @Disabled("UNION does not work")
+ /** Test case for
+ * <a
href="https://issues.apache.org/jira/browse/CALCITE-6298">[CALCITE-6298]
+ * Support UNION in Arrow adapter</a>. */
@Test void testArrowUnion() {
String sql = "(select \"intField\"\n"
+ "from arrowdata\n"
@@ -563,6 +565,62 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
.explainContains(plan);
}
+ /** Test case for
+ * <a
href="https://issues.apache.org/jira/browse/CALCITE-6298">[CALCITE-6298]
+ * Support UNION in Arrow adapter</a>.
+ *
+ * <p>Tests three-way UNION to verify that multiple concurrent scans
+ * on the same table work correctly. */
+ @Test void testArrowUnionThreeWay() {
+ String sql = "(select \"intField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\" = 1)\n"
+ + " union \n"
+ + "(select \"intField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\" = 2)\n"
+ + " union \n"
+ + "(select \"intField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\" = 3)\n";
+ String result = "intField=1\nintField=2\nintField=3\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result);
+ }
+
+ /** Test case for
+ * <a
href="https://issues.apache.org/jira/browse/CALCITE-6298">[CALCITE-6298]
+ * Support UNION in Arrow adapter</a>.
+ *
+ * <p>Tests four-way UNION ALL to verify that repeated scans
+ * on the same table work correctly without deduplication. */
+ @Test void testArrowUnionAllFourWay() {
+ String sql = "(select \"intField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\" = 1)\n"
+ + " union all\n"
+ + "(select \"intField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\" = 1)\n"
+ + " union all\n"
+ + "(select \"intField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\" = 2)\n"
+ + " union all\n"
+ + "(select \"intField\"\n"
+ + "from arrowdata\n"
+ + "where \"intField\" = 2)\n";
+ String result = "intField=1\nintField=1\nintField=2\nintField=2\n";
+
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .returns(result);
+ }
+
@Test void testFieldWithSpace() {
String sql = "select \"my Field\" from (select \"intField\",
\"stringField\" as \"my Field\"\n"
+ "from arrowdata)\n"