This is an automated email from the ASF dual-hosted git repository.

cancai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/main by this push:
     new e5192ec21d [CALCITE-6298] Support UNION in Arrow adapter
e5192ec21d is described below

commit e5192ec21d4b076e6ffc269fec3895ce9d8e59d7
Author: Cancai Cai <[email protected]>
AuthorDate: Sun Apr 5 14:45:03 2026 +0800

    [CALCITE-6298] Support UNION in Arrow adapter
---
 .../calcite/adapter/arrow/ArrowEnumerable.java     | 12 +++--
 .../adapter/arrow/ArrowFilterEnumerator.java       |  8 ++-
 .../adapter/arrow/ArrowProjectEnumerator.java      |  6 ++-
 .../apache/calcite/adapter/arrow/ArrowSchema.java  | 21 ++++----
 .../apache/calcite/adapter/arrow/ArrowTable.java   | 60 ++++++++++++++++++----
 .../calcite/adapter/arrow/ArrowAdapterTest.java    | 60 +++++++++++++++++++++-
 6 files changed, 139 insertions(+), 28 deletions(-)

diff --git 
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java 
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java
index 142f18c2f6..516822567e 100644
--- a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowEnumerable.java
@@ -35,22 +35,26 @@ class ArrowEnumerable extends AbstractEnumerable<Object> {
   private final ImmutableIntList fields;
   private final @Nullable Projector projector;
   private final @Nullable Filter filter;
-
+  private final Runnable onClose;
 
   ArrowEnumerable(ArrowFileReader arrowFileReader, ImmutableIntList fields,
-      @Nullable Projector projector, @Nullable Filter filter) {
+      @Nullable Projector projector, @Nullable Filter filter,
+      Runnable onClose) {
     this.arrowFileReader = arrowFileReader;
     this.projector = projector;
     this.filter = filter;
     this.fields = fields;
+    this.onClose = onClose;
   }
 
   @Override public Enumerator<Object> enumerator() {
     try {
       if (projector != null) {
-        return new ArrowProjectEnumerator(arrowFileReader, fields, projector);
+        return new ArrowProjectEnumerator(arrowFileReader, fields, projector,
+            onClose);
       } else if (filter != null) {
-        return new ArrowFilterEnumerator(arrowFileReader, fields, filter);
+        return new ArrowFilterEnumerator(arrowFileReader, fields, filter,
+            onClose);
       }
       throw new IllegalArgumentException(
           "The arrow enumerator must have either a filter or a projection");
diff --git 
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java
 
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java
index e54a8c6ed0..5eddec2249 100644
--- 
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java
+++ 
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFilterEnumerator.java
@@ -45,10 +45,14 @@ class ArrowFilterEnumerator extends AbstractArrowEnumerator 
{
   private @Nullable SelectionVector selectionVector;
   private int selectionVectorIndex;
 
-  ArrowFilterEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList 
fields, Filter filter) {
+  private final Runnable onClose;
+
+  ArrowFilterEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList 
fields,
+      Filter filter, Runnable onClose) {
     super(arrowFileReader, fields);
     this.allocator = new RootAllocator(Long.MAX_VALUE);
     this.filter = filter;
+    this.onClose = onClose;
   }
 
   @Override void evaluateOperator(ArrowRecordBatch arrowRecordBatch) {
@@ -98,6 +102,8 @@ class ArrowFilterEnumerator extends AbstractArrowEnumerator {
       filter.close();
     } catch (GandivaException e) {
       throw Util.toUnchecked(e);
+    } finally {
+      onClose.run();
     }
   }
 }
diff --git 
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java
 
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java
index 2426810bbc..0895f36cf1 100644
--- 
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java
+++ 
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowProjectEnumerator.java
@@ -31,11 +31,13 @@
  */
 class ArrowProjectEnumerator extends AbstractArrowEnumerator {
   private final Projector projector;
+  private final Runnable onClose;
 
   ArrowProjectEnumerator(ArrowFileReader arrowFileReader, ImmutableIntList 
fields,
-      Projector projector) {
+      Projector projector, Runnable onClose) {
     super(arrowFileReader, fields);
     this.projector = projector;
+    this.onClose = onClose;
   }
 
   @Override protected void evaluateOperator(ArrowRecordBatch arrowRecordBatch) 
{
@@ -71,6 +73,8 @@ class ArrowProjectEnumerator extends AbstractArrowEnumerator {
       projector.close();
     } catch (GandivaException e) {
       throw Util.toUnchecked(e);
+    } finally {
+      onClose.run();
     }
   }
 }
diff --git 
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java 
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java
index c40cf4439a..510adfb842 100644
--- a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowSchema.java
@@ -24,6 +24,7 @@
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.ipc.ArrowFileReader;
 import org.apache.arrow.vector.ipc.SeekableReadChannel;
+import org.apache.arrow.vector.types.pojo.Schema;
 
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableMap;
@@ -34,7 +35,7 @@
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
@@ -96,21 +97,19 @@ private static Map<String, Table> deduceTableMap(File 
baseDirectory) {
     final Map<String, Table> tables = new HashMap<>();
     for (File file : files) {
       final File arrowFile = new File(Sources.of(file).path());
-      final FileInputStream fileInputStream;
-      try {
-        fileInputStream = new FileInputStream(arrowFile);
-      } catch (FileNotFoundException e) {
+      final Schema arrowSchema;
+      try (FileInputStream fis = new FileInputStream(arrowFile);
+           ArrowFileReader reader =
+               new ArrowFileReader(new SeekableReadChannel(fis.getChannel()),
+               new RootAllocator())) {
+        arrowSchema = reader.getVectorSchemaRoot().getSchema();
+      } catch (IOException e) {
         throw Util.toUnchecked(e);
       }
-      final SeekableReadChannel seekableReadChannel =
-          new SeekableReadChannel(fileInputStream.getChannel());
-      final RootAllocator allocator = new RootAllocator();
-      final ArrowFileReader arrowFileReader =
-          new ArrowFileReader(seekableReadChannel, allocator);
       final String tableName =
           trim(file.getName(), ".arrow").toUpperCase(Locale.ROOT);
       final ArrowTable table =
-          new ArrowTable(null, arrowFileReader);
+          new ArrowTable(null, arrowFile, arrowSchema);
       tables.put(tableName, table);
     }
 
diff --git 
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java 
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java
index 358a08fb25..fa8d59389b 100644
--- a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java
+++ b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowTable.java
@@ -43,13 +43,17 @@
 import org.apache.arrow.gandiva.expression.ExpressionTree;
 import org.apache.arrow.gandiva.expression.TreeBuilder;
 import org.apache.arrow.gandiva.expression.TreeNode;
+import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.SeekableReadChannel;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
@@ -62,23 +66,34 @@
 import static java.util.Objects.requireNonNull;
 
 /**
- * Arrow Table.
+ * Table backed by an Apache Arrow file.
+ *
+ * <p>Reads data from an Arrow IPC file on disk and supports projection
+ * and filter push-down via the Gandiva expression compiler.
+ *
+ * <p>Implements {@link TranslatableTable} so that it can be converted into
+ * an {@link ArrowTableScan} for query planning, and {@link QueryableTable}
+ * so that it can be used via the {@link org.apache.calcite.linq4j} API.
  */
 public class ArrowTable extends AbstractTable
     implements TranslatableTable, QueryableTable {
   private final @Nullable RelProtoDataType protoRowType;
   /** Arrow schema. (In Calcite terminology, more like a row type than a 
Schema.) */
   private final Schema schema;
-  private final ArrowFileReader arrowFileReader;
+  private final File arrowFile;
 
-  ArrowTable(@Nullable RelProtoDataType protoRowType, ArrowFileReader 
arrowFileReader) {
-    try {
-      this.schema = arrowFileReader.getVectorSchemaRoot().getSchema();
-    } catch (IOException e) {
-      throw Util.toUnchecked(e);
-    }
+  /** Creates an ArrowTable.
+   *
+   * @param protoRowType Optional row type override; if null, the row type is
+   *                     deduced from the Arrow schema
+   * @param arrowFile    Arrow IPC file on disk
+   * @param schema       Arrow schema of the file
+   */
+  ArrowTable(@Nullable RelProtoDataType protoRowType, File arrowFile,
+      Schema schema) {
     this.protoRowType = protoRowType;
-    this.arrowFileReader = arrowFileReader;
+    this.arrowFile = arrowFile;
+    this.schema = schema;
   }
 
   @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {
@@ -148,7 +163,23 @@ public Enumerable<Object> query(DataContext root, 
ImmutableIntList fields,
       }
     }
 
-    return new ArrowEnumerable(arrowFileReader, fields, projector, filter);
+    FileInputStream fis = null;
+    try {
+      fis = new FileInputStream(arrowFile);
+      final ArrowFileReader reader =
+          new ArrowFileReader(new SeekableReadChannel(fis.getChannel()),
+              new RootAllocator());
+      final FileInputStream fisRef = fis;
+      final Runnable onClose = () -> closeSilently(fisRef);
+      fis = null; // ownership transferred to onClose
+      return new ArrowEnumerable(reader, fields, projector, filter, onClose);
+    } catch (IOException e) {
+      throw Util.toUnchecked(e);
+    } finally {
+      if (fis != null) {
+        closeSilently(fis);
+      }
+    }
   }
 
   @Override public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
@@ -200,6 +231,15 @@ private TreeNode convertConditionToGandiva(ConditionToken 
token) {
         token.operator, treeNodes, new ArrowType.Bool());
   }
 
+  /** Closes an {@link AutoCloseable} without throwing. */
+  private static void closeSilently(AutoCloseable closeable) {
+    try {
+      closeable.close();
+    } catch (Exception e) {
+      // ignore
+    }
+  }
+
   private static TreeNode makeLiteralNode(String literal, String type) {
     if (type.startsWith("decimal")) {
       String[] typeParts =
diff --git 
a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java 
b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
index 67b3075b4e..e8d09d6bd9 100644
--- a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
+++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
@@ -536,7 +536,9 @@ static void initializeArrowState(@TempDir Path 
sharedTempDir)
         .explainContains(plan);
   }
 
-  @Disabled("UNION does not work")
+  /** Test case for
+   * <a 
href="https://issues.apache.org/jira/browse/CALCITE-6298";>[CALCITE-6298]
+   * Support UNION in Arrow adapter</a>. */
   @Test void testArrowUnion() {
     String sql = "(select \"intField\"\n"
         + "from arrowdata\n"
@@ -563,6 +565,62 @@ static void initializeArrowState(@TempDir Path 
sharedTempDir)
         .explainContains(plan);
   }
 
+  /** Test case for
+   * <a 
href="https://issues.apache.org/jira/browse/CALCITE-6298";>[CALCITE-6298]
+   * Support UNION in Arrow adapter</a>.
+   *
+   * <p>Tests three-way UNION to verify that multiple concurrent scans
+   * on the same table work correctly. */
+  @Test void testArrowUnionThreeWay() {
+    String sql = "(select \"intField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\" = 1)\n"
+        + "  union \n"
+        + "(select \"intField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\" = 2)\n"
+        + "  union \n"
+        + "(select \"intField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\" = 3)\n";
+    String result = "intField=1\nintField=2\nintField=3\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result);
+  }
+
+  /** Test case for
+   * <a 
href="https://issues.apache.org/jira/browse/CALCITE-6298";>[CALCITE-6298]
+   * Support UNION in Arrow adapter</a>.
+   *
+   * <p>Tests four-way UNION ALL to verify that repeated scans
+   * on the same table work correctly without deduplication. */
+  @Test void testArrowUnionAllFourWay() {
+    String sql = "(select \"intField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\" = 1)\n"
+        + "  union all\n"
+        + "(select \"intField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\" = 1)\n"
+        + "  union all\n"
+        + "(select \"intField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\" = 2)\n"
+        + "  union all\n"
+        + "(select \"intField\"\n"
+        + "from arrowdata\n"
+        + "where \"intField\" = 2)\n";
+    String result = "intField=1\nintField=1\nintField=2\nintField=2\n";
+
+    CalciteAssert.that()
+        .with(arrow)
+        .query(sql)
+        .returns(result);
+  }
+
   @Test void testFieldWithSpace() {
     String sql = "select \"my Field\" from (select \"intField\", 
\"stringField\" as \"my Field\"\n"
         + "from arrowdata)\n"

Reply via email to