This is an automated email from the ASF dual-hosted git repository.

andygrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-java.git


The following commit(s) were added to refs/heads/main by this push:
     new 50885c6  feat(csv): expose CsvReadOptions via registerCsv and readCsv 
(#21)
50885c6 is described below

commit 50885c6d176909c3ac1b7865d7e67299881cf8d0
Author: Lantao Jin <[email protected]>
AuthorDate: Wed May 13 20:56:58 2026 +0800

    feat(csv): expose CsvReadOptions via registerCsv and readCsv (#21)
---
 README.md                                          |   3 +-
 native/src/csv.rs                                  | 204 +++++++++++++++++++++
 native/src/lib.rs                                  |   1 +
 .../java/org/apache/datafusion/CsvReadOptions.java | 155 ++++++++++++++++
 .../java/org/apache/datafusion/SessionContext.java | 110 +++++++++++
 .../org/apache/datafusion/CsvReadOptionsTest.java  |  84 +++++++++
 .../apache/datafusion/SessionContextCsvTest.java   | 121 ++++++++++++
 7 files changed, 677 insertions(+), 1 deletion(-)

diff --git a/README.md b/README.md
index 21637cd..b22ddb0 100644
--- a/README.md
+++ b/README.md
@@ -48,7 +48,8 @@ Query interfaces:
 Data sources:
 
 - [x] Parquet via `registerParquet` / `readParquet`, with `ParquetReadOptions`
-- [ ] CSV, JSON, Avro
+- [x] CSV via `registerCsv` / `readCsv`, with `CsvReadOptions`
+- [ ] JSON, Avro
 - [ ] Custom catalog and table providers
 
 Results:
diff --git a/native/src/csv.rs b/native/src/csv.rs
new file mode 100644
index 0000000..a1b6fdd
--- /dev/null
+++ b/native/src/csv.rs
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::str::FromStr;
+
+use datafusion::arrow::datatypes::Schema;
+use datafusion::arrow::ipc::reader::StreamReader;
+use 
datafusion::datasource::file_format::file_compression_type::FileCompressionType;
+use datafusion::error::DataFusionError;
+use datafusion::prelude::{CsvReadOptions, SessionContext};
+use jni::objects::{JByteArray, JClass, JString};
+use jni::sys::{jboolean, jbyte, jlong};
+use jni::JNIEnv;
+
+use crate::errors::{try_unwrap_or_throw, JniResult};
+use crate::runtime;
+
+#[allow(clippy::too_many_arguments)]
+fn with_csv_options<R>(
+    env: &mut JNIEnv,
+    has_header: jboolean,
+    delimiter: jbyte,
+    quote: jbyte,
+    terminator_set: jboolean,
+    terminator_value: jbyte,
+    escape_set: jboolean,
+    escape_value: jbyte,
+    comment_set: jboolean,
+    comment_value: jbyte,
+    newlines_in_values_set: jboolean,
+    newlines_in_values_value: jboolean,
+    schema_infer_max_records: jlong,
+    file_extension: JString,
+    file_compression_type: JString,
+    schema_ipc_bytes: JByteArray,
+    f: impl FnOnce(CsvReadOptions) -> JniResult<R>,
+) -> JniResult<R> {
+    let file_ext: String = env.get_string(&file_extension)?.into();
+    let compression: String = env.get_string(&file_compression_type)?.into();
+    let compression = FileCompressionType::from_str(&compression)?;
+
+    let schema: Option<Schema> = if !schema_ipc_bytes.is_null() {
+        let bytes: Vec<u8> = env.convert_byte_array(&schema_ipc_bytes)?;
+        let reader = StreamReader::try_new(std::io::Cursor::new(bytes), None)?;
+        Some((*reader.schema()).clone())
+    } else {
+        None
+    };
+
+    let mut opts = CsvReadOptions::new()
+        .has_header(has_header != 0)
+        .delimiter(delimiter as u8)
+        .quote(quote as u8)
+        .file_extension(&file_ext)
+        .file_compression_type(compression);
+
+    if terminator_set != 0 {
+        opts = opts.terminator(Some(terminator_value as u8));
+    }
+    if escape_set != 0 {
+        opts = opts.escape(escape_value as u8);
+    }
+    if comment_set != 0 {
+        opts = opts.comment(comment_value as u8);
+    }
+    if newlines_in_values_set != 0 {
+        opts = opts.newlines_in_values(newlines_in_values_value != 0);
+    }
+    if schema_infer_max_records >= 0 {
+        opts = opts.schema_infer_max_records(schema_infer_max_records as 
usize);
+    }
+    if let Some(ref s) = schema {
+        opts = opts.schema(s);
+    }
+
+    f(opts)
+}
+
+#[allow(clippy::too_many_arguments)]
+#[no_mangle]
+pub extern "system" fn 
Java_org_apache_datafusion_SessionContext_registerCsvWithOptions<'local>(
+    mut env: JNIEnv<'local>,
+    _class: JClass<'local>,
+    handle: jlong,
+    name: JString<'local>,
+    path: JString<'local>,
+    has_header: jboolean,
+    delimiter: jbyte,
+    quote: jbyte,
+    terminator_set: jboolean,
+    terminator_value: jbyte,
+    escape_set: jboolean,
+    escape_value: jbyte,
+    comment_set: jboolean,
+    comment_value: jbyte,
+    newlines_in_values_set: jboolean,
+    newlines_in_values_value: jboolean,
+    schema_infer_max_records: jlong,
+    file_extension: JString<'local>,
+    file_compression_type: JString<'local>,
+    schema_ipc_bytes: JByteArray<'local>,
+) {
+    try_unwrap_or_throw(&mut env, (), |env| -> JniResult<()> {
+        if handle == 0 {
+            return Err("SessionContext handle is null".into());
+        }
+        let ctx = unsafe { &*(handle as *const SessionContext) };
+        let name: String = env.get_string(&name)?.into();
+        let path: String = env.get_string(&path)?.into();
+        with_csv_options(
+            env,
+            has_header,
+            delimiter,
+            quote,
+            terminator_set,
+            terminator_value,
+            escape_set,
+            escape_value,
+            comment_set,
+            comment_value,
+            newlines_in_values_set,
+            newlines_in_values_value,
+            schema_infer_max_records,
+            file_extension,
+            file_compression_type,
+            schema_ipc_bytes,
+            |opts| {
+                runtime().block_on(async {
+                    ctx.register_csv(&name, &path, opts).await?;
+                    Ok::<(), DataFusionError>(())
+                })?;
+                Ok(())
+            },
+        )
+    })
+}
+
+#[allow(clippy::too_many_arguments)]
+#[no_mangle]
+pub extern "system" fn 
Java_org_apache_datafusion_SessionContext_readCsvWithOptions<'local>(
+    mut env: JNIEnv<'local>,
+    _class: JClass<'local>,
+    handle: jlong,
+    path: JString<'local>,
+    has_header: jboolean,
+    delimiter: jbyte,
+    quote: jbyte,
+    terminator_set: jboolean,
+    terminator_value: jbyte,
+    escape_set: jboolean,
+    escape_value: jbyte,
+    comment_set: jboolean,
+    comment_value: jbyte,
+    newlines_in_values_set: jboolean,
+    newlines_in_values_value: jboolean,
+    schema_infer_max_records: jlong,
+    file_extension: JString<'local>,
+    file_compression_type: JString<'local>,
+    schema_ipc_bytes: JByteArray<'local>,
+) -> jlong {
+    try_unwrap_or_throw(&mut env, 0, |env| -> JniResult<jlong> {
+        if handle == 0 {
+            return Err("SessionContext handle is null".into());
+        }
+        let ctx = unsafe { &*(handle as *const SessionContext) };
+        let path: String = env.get_string(&path)?.into();
+        with_csv_options(
+            env,
+            has_header,
+            delimiter,
+            quote,
+            terminator_set,
+            terminator_value,
+            escape_set,
+            escape_value,
+            comment_set,
+            comment_value,
+            newlines_in_values_set,
+            newlines_in_values_value,
+            schema_infer_max_records,
+            file_extension,
+            file_compression_type,
+            schema_ipc_bytes,
+            |opts| {
+                let df = runtime().block_on(ctx.read_csv(path, opts))?;
+                Ok(Box::into_raw(Box::new(df)) as jlong)
+            },
+        )
+    })
+}
diff --git a/native/src/lib.rs b/native/src/lib.rs
index 463d075..fc3a983 100644
--- a/native/src/lib.rs
+++ b/native/src/lib.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod csv;
 mod errors;
 mod proto;
 
diff --git a/src/main/java/org/apache/datafusion/CsvReadOptions.java 
b/src/main/java/org/apache/datafusion/CsvReadOptions.java
new file mode 100644
index 0000000..4b35b95
--- /dev/null
+++ b/src/main/java/org/apache/datafusion/CsvReadOptions.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datafusion;
+
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/**
+ * Configuration knobs for CSV sources passed to {@link 
SessionContext#registerCsv(String, String,
+ * CsvReadOptions)} and {@link SessionContext#readCsv(String, CsvReadOptions)}.
+ *
+ * <p>Mirrors a subset of DataFusion's {@code CsvReadOptions}. All setters 
return {@code this} for
+ * fluent chaining. Defaults match the Rust struct: {@code hasHeader = true}, 
{@code delimiter =
+ * ','}, {@code quote = '"'}, {@code fileExtension = ".csv"}, {@code 
fileCompressionType =
+ * UNCOMPRESSED}, all other fields {@code null} (meaning the DataFusion 
default is used, or the
+ * schema is inferred from the file).
+ */
+public final class CsvReadOptions {
+
+  /** Compression of the file. Names match DataFusion's {@code 
FileCompressionType} variants. */
+  public enum FileCompressionType {
+    UNCOMPRESSED,
+    GZIP,
+    BZIP2,
+    XZ,
+    ZSTD
+  }
+
+  private boolean hasHeader = true;
+  private byte delimiter = (byte) ',';
+  private byte quote = (byte) '"';
+  private Byte terminator;
+  private Byte escape;
+  private Byte comment;
+  private Boolean newlinesInValues;
+  private Long schemaInferMaxRecords;
+  private String fileExtension = ".csv";
+  private FileCompressionType fileCompressionType = 
FileCompressionType.UNCOMPRESSED;
+  private Schema schema;
+
+  public CsvReadOptions hasHeader(boolean v) {
+    this.hasHeader = v;
+    return this;
+  }
+
+  public CsvReadOptions delimiter(byte b) {
+    this.delimiter = b;
+    return this;
+  }
+
+  public CsvReadOptions quote(byte b) {
+    this.quote = b;
+    return this;
+  }
+
+  public CsvReadOptions terminator(byte b) {
+    this.terminator = b;
+    return this;
+  }
+
+  public CsvReadOptions escape(byte b) {
+    this.escape = b;
+    return this;
+  }
+
+  public CsvReadOptions comment(byte b) {
+    this.comment = b;
+    return this;
+  }
+
+  public CsvReadOptions newlinesInValues(boolean v) {
+    this.newlinesInValues = v;
+    return this;
+  }
+
+  public CsvReadOptions schemaInferMaxRecords(long n) {
+    this.schemaInferMaxRecords = n;
+    return this;
+  }
+
+  public CsvReadOptions fileExtension(String ext) {
+    this.fileExtension = ext;
+    return this;
+  }
+
+  public CsvReadOptions fileCompressionType(FileCompressionType t) {
+    this.fileCompressionType = t;
+    return this;
+  }
+
+  public CsvReadOptions schema(Schema schema) {
+    this.schema = schema;
+    return this;
+  }
+
+  boolean hasHeader() {
+    return hasHeader;
+  }
+
+  byte delimiter() {
+    return delimiter;
+  }
+
+  byte quote() {
+    return quote;
+  }
+
+  Byte terminator() {
+    return terminator;
+  }
+
+  Byte escape() {
+    return escape;
+  }
+
+  Byte comment() {
+    return comment;
+  }
+
+  Boolean newlinesInValues() {
+    return newlinesInValues;
+  }
+
+  Long schemaInferMaxRecords() {
+    return schemaInferMaxRecords;
+  }
+
+  String fileExtension() {
+    return fileExtension;
+  }
+
+  FileCompressionType fileCompressionType() {
+    return fileCompressionType;
+  }
+
+  Schema schema() {
+    return schema;
+  }
+}
diff --git a/src/main/java/org/apache/datafusion/SessionContext.java 
b/src/main/java/org/apache/datafusion/SessionContext.java
index 823ee13..1aec343 100644
--- a/src/main/java/org/apache/datafusion/SessionContext.java
+++ b/src/main/java/org/apache/datafusion/SessionContext.java
@@ -103,6 +103,77 @@ public final class SessionContext implements AutoCloseable 
{
     }
   }
 
+  public void registerCsv(String name, String path) {
+    registerCsv(name, path, new CsvReadOptions());
+  }
+
+  /**
+   * Register a CSV file (or directory of CSV files) as a table with the 
supplied {@link
+   * CsvReadOptions}.
+   *
+   * @throws RuntimeException if registration fails (path not found, schema 
inference error, etc.).
+   */
+  public void registerCsv(String name, String path, CsvReadOptions options) {
+    if (nativeHandle == 0) {
+      throw new IllegalStateException("SessionContext is closed");
+    }
+    registerCsvWithOptions(
+        nativeHandle,
+        name,
+        path,
+        options.hasHeader(),
+        options.delimiter(),
+        options.quote(),
+        options.terminator() != null,
+        options.terminator() != null ? options.terminator() : 0,
+        options.escape() != null,
+        options.escape() != null ? options.escape() : 0,
+        options.comment() != null,
+        options.comment() != null ? options.comment() : 0,
+        options.newlinesInValues() != null,
+        options.newlinesInValues() != null && options.newlinesInValues(),
+        options.schemaInferMaxRecords() != null ? 
options.schemaInferMaxRecords() : -1L,
+        options.fileExtension(),
+        options.fileCompressionType().name(),
+        options.schema() != null ? serializeSchemaIpc(options.schema()) : 
null);
+  }
+
+  /** Read a CSV file as a {@link DataFrame} without registering it. */
+  public DataFrame readCsv(String path) {
+    return readCsv(path, new CsvReadOptions());
+  }
+
+  /**
+   * Read a CSV file as a {@link DataFrame} with the supplied {@link 
CsvReadOptions}.
+   *
+   * @throws RuntimeException if the read fails.
+   */
+  public DataFrame readCsv(String path, CsvReadOptions options) {
+    if (nativeHandle == 0) {
+      throw new IllegalStateException("SessionContext is closed");
+    }
+    long dfHandle =
+        readCsvWithOptions(
+            nativeHandle,
+            path,
+            options.hasHeader(),
+            options.delimiter(),
+            options.quote(),
+            options.terminator() != null,
+            options.terminator() != null ? options.terminator() : 0,
+            options.escape() != null,
+            options.escape() != null ? options.escape() : 0,
+            options.comment() != null,
+            options.comment() != null ? options.comment() : 0,
+            options.newlinesInValues() != null,
+            options.newlinesInValues() != null && options.newlinesInValues(),
+            options.schemaInferMaxRecords() != null ? 
options.schemaInferMaxRecords() : -1L,
+            options.fileExtension(),
+            options.fileCompressionType().name(),
+            options.schema() != null ? serializeSchemaIpc(options.schema()) : 
null);
+    return new DataFrame(dfHandle);
+  }
+
   public void registerParquet(String name, String path) {
     registerParquet(name, path, new ParquetReadOptions());
   }
@@ -209,5 +280,44 @@ public final class SessionContext implements AutoCloseable 
{
       long metadataSizeHint,
       byte[] schemaIpcBytes);
 
+  private static native void registerCsvWithOptions(
+      long handle,
+      String name,
+      String path,
+      boolean hasHeader,
+      byte delimiter,
+      byte quote,
+      boolean terminatorSet,
+      byte terminatorValue,
+      boolean escapeSet,
+      byte escapeValue,
+      boolean commentSet,
+      byte commentValue,
+      boolean newlinesInValuesSet,
+      boolean newlinesInValuesValue,
+      long schemaInferMaxRecords,
+      String fileExtension,
+      String fileCompressionType,
+      byte[] schemaIpcBytes);
+
+  private static native long readCsvWithOptions(
+      long handle,
+      String path,
+      boolean hasHeader,
+      byte delimiter,
+      byte quote,
+      boolean terminatorSet,
+      byte terminatorValue,
+      boolean escapeSet,
+      byte escapeValue,
+      boolean commentSet,
+      byte commentValue,
+      boolean newlinesInValuesSet,
+      boolean newlinesInValuesValue,
+      long schemaInferMaxRecords,
+      String fileExtension,
+      String fileCompressionType,
+      byte[] schemaIpcBytes);
+
   private static native void closeSessionContext(long handle);
 }
diff --git a/src/test/java/org/apache/datafusion/CsvReadOptionsTest.java 
b/src/test/java/org/apache/datafusion/CsvReadOptionsTest.java
new file mode 100644
index 0000000..6f49403
--- /dev/null
+++ b/src/test/java/org/apache/datafusion/CsvReadOptionsTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datafusion;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.jupiter.api.Test;
+
+class CsvReadOptionsTest {
+
+  @Test
+  void defaultsMatchDataFusion() {
+    CsvReadOptions opts = new CsvReadOptions();
+    assertTrue(opts.hasHeader());
+    assertEquals((byte) ',', opts.delimiter());
+    assertEquals((byte) '"', opts.quote());
+    assertNull(opts.terminator());
+    assertNull(opts.escape());
+    assertNull(opts.comment());
+    assertNull(opts.newlinesInValues());
+    assertNull(opts.schemaInferMaxRecords());
+    assertEquals(".csv", opts.fileExtension());
+    assertEquals(CsvReadOptions.FileCompressionType.UNCOMPRESSED, 
opts.fileCompressionType());
+    assertNull(opts.schema());
+  }
+
+  @Test
+  void fluentSettersChainAndMutate() {
+    Schema schema =
+        new Schema(List.of(new Field("x", FieldType.nullable(new 
ArrowType.Int(32, true)), null)));
+
+    CsvReadOptions opts =
+        new CsvReadOptions()
+            .hasHeader(false)
+            .delimiter((byte) '|')
+            .quote((byte) '\'')
+            .terminator((byte) '\n')
+            .escape((byte) '\\')
+            .comment((byte) '#')
+            .newlinesInValues(true)
+            .schemaInferMaxRecords(10L)
+            .fileExtension(".tsv")
+            .fileCompressionType(CsvReadOptions.FileCompressionType.GZIP)
+            .schema(schema);
+
+    assertEquals(false, opts.hasHeader());
+    assertEquals((byte) '|', opts.delimiter());
+    assertEquals((byte) '\'', opts.quote());
+    assertEquals(Byte.valueOf((byte) '\n'), opts.terminator());
+    assertEquals(Byte.valueOf((byte) '\\'), opts.escape());
+    assertEquals(Byte.valueOf((byte) '#'), opts.comment());
+    assertEquals(Boolean.TRUE, opts.newlinesInValues());
+    assertEquals(Long.valueOf(10L), opts.schemaInferMaxRecords());
+    assertEquals(".tsv", opts.fileExtension());
+    assertEquals(CsvReadOptions.FileCompressionType.GZIP, 
opts.fileCompressionType());
+    assertSame(schema, opts.schema());
+  }
+}
diff --git a/src/test/java/org/apache/datafusion/SessionContextCsvTest.java 
b/src/test/java/org/apache/datafusion/SessionContextCsvTest.java
new file mode 100644
index 0000000..47862d4
--- /dev/null
+++ b/src/test/java/org/apache/datafusion/SessionContextCsvTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datafusion;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class SessionContextCsvTest {
+
+  private static Path writeCsv(Path dir, String name, String contents) throws 
IOException {
+    Path file = dir.resolve(name);
+    Files.writeString(file, contents);
+    return file;
+  }
+
+  @Test
+  void registerCsvWithHeaderInfersSchemaAndCounts(@TempDir Path tempDir) 
throws Exception {
+    Path csv = writeCsv(tempDir, "people.csv", 
"id,name\n1,alice\n2,bob\n3,carol\n");
+
+    try (BufferAllocator allocator = new RootAllocator();
+        SessionContext ctx = new SessionContext()) {
+      ctx.registerCsv("people", csv.toAbsolutePath().toString());
+
+      try (DataFrame df = ctx.sql("SELECT COUNT(*) FROM people");
+          ArrowReader reader = df.collect(allocator)) {
+        assertTrue(reader.loadNextBatch());
+        BigIntVector count = (BigIntVector) 
reader.getVectorSchemaRoot().getVector(0);
+        assertEquals(3L, count.get(0));
+      }
+
+      try (DataFrame df = ctx.sql("SELECT name FROM people WHERE id = 2");
+          ArrowReader reader = df.collect(allocator)) {
+        assertTrue(reader.loadNextBatch());
+        VectorSchemaRoot root = reader.getVectorSchemaRoot();
+        assertEquals(1, root.getRowCount());
+        VarCharVector names = (VarCharVector) root.getVector(0);
+        assertEquals("bob", new String(names.get(0)));
+      }
+    }
+  }
+
+  @Test
+  void readCsvWithExplicitSchemaAndNoHeader(@TempDir Path tempDir) throws 
Exception {
+    Path csv = writeCsv(tempDir, "headerless.csv", "1|alice\n2|bob\n");
+
+    Schema schema =
+        new Schema(
+            List.of(
+                new Field("id", FieldType.nullable(new ArrowType.Int(64, 
true)), null),
+                new Field("name", FieldType.nullable(new ArrowType.Utf8()), 
null)));
+
+    CsvReadOptions opts =
+        new CsvReadOptions().hasHeader(false).delimiter((byte) 
'|').schema(schema);
+
+    try (BufferAllocator allocator = new RootAllocator();
+        SessionContext ctx = new SessionContext();
+        DataFrame df = ctx.readCsv(csv.toAbsolutePath().toString(), opts);
+        ArrowReader reader = df.collect(allocator)) {
+      assertTrue(reader.loadNextBatch());
+      VectorSchemaRoot root = reader.getVectorSchemaRoot();
+      assertEquals(2, root.getRowCount());
+      assertEquals("id", root.getSchema().getFields().get(0).getName());
+      assertEquals("name", root.getSchema().getFields().get(1).getName());
+    }
+  }
+
+  @Test
+  void registerCsvWithCustomExtension(@TempDir Path tempDir) throws Exception {
+    Path csv = writeCsv(tempDir, "data.tsv", "x\ty\n10\t20\n30\t40\n");
+
+    try (BufferAllocator allocator = new RootAllocator();
+        SessionContext ctx = new SessionContext()) {
+      ctx.registerCsv(
+          "t",
+          csv.toAbsolutePath().toString(),
+          new CsvReadOptions().delimiter((byte) '\t').fileExtension(".tsv"));
+
+      try (DataFrame df = ctx.sql("SELECT SUM(x) + SUM(y) FROM t");
+          ArrowReader reader = df.collect(allocator)) {
+        assertTrue(reader.loadNextBatch());
+        BigIntVector v = (BigIntVector) 
reader.getVectorSchemaRoot().getVector(0);
+        assertEquals(100L, v.get(0));
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to