This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new fd69e5ef9f GH-28866: [Java] Java Dataset API ScanOptions expansion 
(#41646)
fd69e5ef9f is described below

commit fd69e5ef9f2d60e3b7bc7fc80208e04994834b9e
Author: Jin Chengcheng <[email protected]>
AuthorDate: Tue Jul 30 13:58:41 2024 +0800

    GH-28866: [Java] Java Dataset API ScanOptions expansion (#41646)
    
    
    
    ### Rationale for this change
    
    ### What changes are included in this PR?
    
    Support to add ArrowSchema to specify C++ 
CsvFragmentScanOptions.convert_options.column_types
    And use Map to set the config, serialize in java and deserialize in C++ for 
CsvFragmentScanOptions
    
    ### Are these changes tested?
    new added UT.
    
    ### Are there any user-facing changes?
    No.
    
    * GitHub Issue: #28866
    
    Authored-by: Chengcheng Jin <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 java/dataset/src/main/cpp/jni_wrapper.cc           | 117 ++++++++++++--
 .../dataset/file/FileSystemDatasetFactory.java     |  40 ++++-
 .../org/apache/arrow/dataset/file/JniWrapper.java  |  12 +-
 .../org/apache/arrow/dataset/jni/JniWrapper.java   |   4 +
 .../apache/arrow/dataset/jni/NativeDataset.java    |  11 +-
 .../arrow/dataset/scanner/FragmentScanOptions.java |  26 ++++
 .../apache/arrow/dataset/scanner/ScanOptions.java  |  21 +++
 .../dataset/scanner/csv/CsvConvertOptions.java     |  48 ++++++
 .../scanner/csv/CsvFragmentScanOptions.java        |  93 ++++++++++++
 .../org/apache/arrow/dataset/utils/MapUtil.java    |  43 ++++++
 .../arrow/dataset/TestFragmentScanOptions.java     | 168 +++++++++++++++++++++
 java/dataset/src/test/resources/data/student.csv   |   4 +
 12 files changed, 566 insertions(+), 21 deletions(-)

diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc 
b/java/dataset/src/main/cpp/jni_wrapper.cc
index 4ef2a2ffd9..f324f87d6c 100644
--- a/java/dataset/src/main/cpp/jni_wrapper.cc
+++ b/java/dataset/src/main/cpp/jni_wrapper.cc
@@ -25,6 +25,9 @@
 #include "arrow/c/helpers.h"
 #include "arrow/dataset/api.h"
 #include "arrow/dataset/file_base.h"
+#ifdef ARROW_CSV
+#include "arrow/dataset/file_csv.h"
+#endif
 #include "arrow/filesystem/api.h"
 #include "arrow/filesystem/path_util.h"
 #include "arrow/engine/substrait/util.h"
@@ -363,6 +366,63 @@ std::shared_ptr<arrow::Buffer> 
LoadArrowBufferFromByteBuffer(JNIEnv* env, jobjec
   return buffer;
 }
 
+inline bool ParseBool(const std::string& value) { return value == "true" ? 
true : false; }
+
+/// \brief Construct FragmentScanOptions from config map
+#ifdef ARROW_CSV
+arrow::Result<std::shared_ptr<arrow::dataset::FragmentScanOptions>>
+ToCsvFragmentScanOptions(const std::unordered_map<std::string, std::string>& 
configs) {
+  std::shared_ptr<arrow::dataset::CsvFragmentScanOptions> options =
+      std::make_shared<arrow::dataset::CsvFragmentScanOptions>();
+  for (auto const& [key, value] : configs) {
+    if (key == "delimiter") {
+      options->parse_options.delimiter = value.data()[0];
+    } else if (key == "quoting") {
+      options->parse_options.quoting = ParseBool(value);
+    } else if (key == "column_types") {
+      int64_t schema_address = std::stol(value);
+      ArrowSchema* c_schema = reinterpret_cast<ArrowSchema*>(schema_address);
+      ARROW_ASSIGN_OR_RAISE(auto schema, arrow::ImportSchema(c_schema));
+      auto& column_types = options->convert_options.column_types;
+      for (auto field : schema->fields()) {
+        column_types[field->name()] = field->type();
+      }
+    } else if (key == "strings_can_be_null") {
+      options->convert_options.strings_can_be_null = ParseBool(value);
+    } else {
+      return arrow::Status::Invalid("Config " + key + " is not supported.");
+    }
+  }
+  return options;
+}
+#endif
+
+arrow::Result<std::shared_ptr<arrow::dataset::FragmentScanOptions>>
+GetFragmentScanOptions(jint file_format_id,
+                       const std::unordered_map<std::string, std::string>& 
configs) {
+  switch (file_format_id) {
+#ifdef ARROW_CSV
+    case 3:
+      return ToCsvFragmentScanOptions(configs);
+#endif
+    default:
+      return arrow::Status::Invalid("Illegal file format id: ", 
file_format_id);
+  }
+}
+
+std::unordered_map<std::string, std::string> ToStringMap(JNIEnv* env,
+                                                         jobjectArray& 
str_array) {
+  int length = env->GetArrayLength(str_array);
+  std::unordered_map<std::string, std::string> map;
+  map.reserve(length / 2);
+  for (int i = 0; i < length; i += 2) {
+    auto key = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, 
i));
+    auto value = 
reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, i + 1));
+    map[JStringToCString(env, key)] = JStringToCString(env, value);
+  }
+  return map;
+}
+
 /*
  * Class:     org_apache_arrow_dataset_jni_NativeMemoryPool
  * Method:    getDefaultMemoryPool
@@ -501,12 +561,13 @@ JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_jni_JniWrapper_closeDataset
 /*
  * Class:     org_apache_arrow_dataset_jni_JniWrapper
  * Method:    createScanner
- * Signature: 
(J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JJ)J
+ * Signature:
+ * 
(J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JI;[Ljava/lang/String;J)J
  */
 JNIEXPORT jlong JNICALL 
Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner(
     JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns,
-    jobject substrait_projection, jobject substrait_filter,
-    jlong batch_size, jlong memory_pool_id) {
+    jobject substrait_projection, jobject substrait_filter, jlong batch_size,
+    jint file_format_id, jobjectArray options, jlong memory_pool_id) {
   JNI_METHOD_START
   arrow::MemoryPool* pool = 
reinterpret_cast<arrow::MemoryPool*>(memory_pool_id);
   if (pool == nullptr) {
@@ -555,6 +616,12 @@ JNIEXPORT jlong JNICALL 
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
     }
     JniAssertOkOrThrow(scanner_builder->Filter(*filter_expr));
   }
+  if (file_format_id != -1 && options != nullptr) {
+    std::unordered_map<std::string, std::string> option_map = ToStringMap(env, 
options);
+    std::shared_ptr<arrow::dataset::FragmentScanOptions> scan_options =
+        JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map));
+    JniAssertOkOrThrow(scanner_builder->FragmentScanOptions(scan_options));
+  }
   JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size));
 
   auto scanner = JniGetOrThrow(scanner_builder->Finish());
@@ -668,14 +735,29 @@ JNIEXPORT void JNICALL 
Java_org_apache_arrow_dataset_jni_JniWrapper_ensureS3Fina
 /*
  * Class:     org_apache_arrow_dataset_file_JniWrapper
  * Method:    makeFileSystemDatasetFactory
- * Signature: (Ljava/lang/String;II)J
+ * Signature: (Ljava/lang/String;II;Ljava/lang/String;Ljava/lang/String)J
  */
 JNIEXPORT jlong JNICALL
-Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2I(
-    JNIEnv* env, jobject, jstring uri, jint file_format_id) {
+Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
+    JNIEnv* env, jobject, jstring uri, jint file_format_id, jobjectArray 
options) {
   JNI_METHOD_START
   std::shared_ptr<arrow::dataset::FileFormat> file_format =
       JniGetOrThrow(GetFileFormat(file_format_id));
+  if (options != nullptr) {
+    std::unordered_map<std::string, std::string> option_map = ToStringMap(env, 
options);
+    std::shared_ptr<arrow::dataset::FragmentScanOptions> scan_options =
+        JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map));
+    file_format->default_fragment_scan_options = scan_options;
+#ifdef ARROW_CSV
+    if (file_format_id == 3) {
+      std::shared_ptr<arrow::dataset::CsvFileFormat> csv_file_format =
+          
std::dynamic_pointer_cast<arrow::dataset::CsvFileFormat>(file_format);
+      csv_file_format->parse_options =
+          
std::dynamic_pointer_cast<arrow::dataset::CsvFragmentScanOptions>(scan_options)
+              ->parse_options;
+    }
+#endif
+  }
   arrow::dataset::FileSystemFactoryOptions options;
   std::shared_ptr<arrow::dataset::DatasetFactory> d =
       JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make(
@@ -686,16 +768,31 @@ 
Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljav
 
 /*
  * Class:     org_apache_arrow_dataset_file_JniWrapper
- * Method:    makeFileSystemDatasetFactory
- * Signature: ([Ljava/lang/String;II)J
+ * Method:    makeFileSystemDatasetFactoryWithFiles
+ * Signature: ([Ljava/lang/String;II;[Ljava/lang/String)J
  */
 JNIEXPORT jlong JNICALL
-Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Ljava_lang_String_2I(
-    JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) {
+Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactoryWithFiles(
+    JNIEnv* env, jobject, jobjectArray uris, jint file_format_id, jobjectArray 
options) {
   JNI_METHOD_START
 
   std::shared_ptr<arrow::dataset::FileFormat> file_format =
       JniGetOrThrow(GetFileFormat(file_format_id));
+  if (options != nullptr) {
+    std::unordered_map<std::string, std::string> option_map = ToStringMap(env, 
options);
+    std::shared_ptr<arrow::dataset::FragmentScanOptions> scan_options =
+        JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map));
+    file_format->default_fragment_scan_options = scan_options;
+#ifdef ARROW_CSV
+    if (file_format_id == 3) {
+      std::shared_ptr<arrow::dataset::CsvFileFormat> csv_file_format =
+          
std::dynamic_pointer_cast<arrow::dataset::CsvFileFormat>(file_format);
+      csv_file_format->parse_options =
+          
std::dynamic_pointer_cast<arrow::dataset::CsvFragmentScanOptions>(scan_options)
+              ->parse_options;
+    }
+#endif
+  }
   arrow::dataset::FileSystemFactoryOptions options;
 
   std::vector<std::string> uri_vec = ToStringVector(env, uris);
diff --git 
a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java
 
b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java
index 36ac6288af..fcf124a61f 100644
--- 
a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java
+++ 
b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java
@@ -16,8 +16,10 @@
  */
 package org.apache.arrow.dataset.file;
 
+import java.util.Optional;
 import org.apache.arrow.dataset.jni.NativeDatasetFactory;
 import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.FragmentScanOptions;
 import org.apache.arrow.memory.BufferAllocator;
 
 /** Java binding of the C++ FileSystemDatasetFactory. */
@@ -25,19 +27,45 @@ public class FileSystemDatasetFactory extends 
NativeDatasetFactory {
 
   public FileSystemDatasetFactory(
       BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat 
format, String uri) {
-    super(allocator, memoryPool, createNative(format, uri));
+    super(allocator, memoryPool, createNative(format, uri, Optional.empty()));
+  }
+
+  public FileSystemDatasetFactory(
+      BufferAllocator allocator,
+      NativeMemoryPool memoryPool,
+      FileFormat format,
+      String uri,
+      Optional<FragmentScanOptions> fragmentScanOptions) {
+    super(allocator, memoryPool, createNative(format, uri, 
fragmentScanOptions));
   }
 
   public FileSystemDatasetFactory(
       BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat 
format, String[] uris) {
-    super(allocator, memoryPool, createNative(format, uris));
+    super(allocator, memoryPool, createNative(format, uris, Optional.empty()));
+  }
+
+  public FileSystemDatasetFactory(
+      BufferAllocator allocator,
+      NativeMemoryPool memoryPool,
+      FileFormat format,
+      String[] uris,
+      Optional<FragmentScanOptions> fragmentScanOptions) {
+    super(allocator, memoryPool, createNative(format, uris, 
fragmentScanOptions));
   }
 
-  private static long createNative(FileFormat format, String uri) {
-    return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id());
+  private static long createNative(
+      FileFormat format, String uri, Optional<FragmentScanOptions> 
fragmentScanOptions) {
+    return JniWrapper.get()
+        .makeFileSystemDatasetFactory(
+            uri, format.id(), 
fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null));
   }
 
-  private static long createNative(FileFormat format, String[] uris) {
-    return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id());
+  private static long createNative(
+      FileFormat format, String[] uris, Optional<FragmentScanOptions> 
fragmentScanOptions) {
+    return JniWrapper.get()
+        .makeFileSystemDatasetFactoryWithFiles(
+            uris,
+            format.id(),
+            
fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null));
   }
 }
diff --git 
a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java 
b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
index dfac293ccb..d2f842f99e 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
@@ -37,22 +37,26 @@ public class JniWrapper {
    * intermediate shared_ptr of the factory instance.
    *
    * @param uri file uri to read, either a file or a directory
-   * @param fileFormat file format ID
+   * @param fileFormat file format ID.
+   * @param serializedFragmentScanOptions serialized FragmentScanOptions.
    * @return the native pointer of the 
arrow::dataset::FileSystemDatasetFactory instance.
    * @see FileFormat
    */
-  public native long makeFileSystemDatasetFactory(String uri, int fileFormat);
+  public native long makeFileSystemDatasetFactory(
+      String uri, int fileFormat, String[] serializedFragmentScanOptions);
 
   /**
    * Create FileSystemDatasetFactory and return its native pointer. The 
pointer is pointing to a
    * intermediate shared_ptr of the factory instance.
    *
    * @param uris List of file uris to read, each path pointing to an 
individual file
-   * @param fileFormat file format ID
+   * @param fileFormat file format ID.
+   * @param serializedFragmentScanOptions serialized FragmentScanOptions.
    * @return the native pointer of the 
arrow::dataset::FileSystemDatasetFactory instance.
    * @see FileFormat
    */
-  public native long makeFileSystemDatasetFactory(String[] uris, int 
fileFormat);
+  public native long makeFileSystemDatasetFactoryWithFiles(
+      String[] uris, int fileFormat, String[] serializedFragmentScanOptions);
 
   /**
    * Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into 
files. This internally
diff --git 
a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java 
b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
index b5aa3d918a..6637c113d9 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
@@ -71,6 +71,8 @@ public class JniWrapper {
    * @param substraitProjection substrait extended expression to evaluate for 
project new columns
    * @param substraitFilter substrait extended expression to evaluate for 
apply filter
    * @param batchSize batch size of scanned record batches.
+   * @param fileFormat file format ID.
+   * @param serializedFragmentScanOptions serialized FragmentScanOptions.
    * @param memoryPool identifier of memory pool used in the native scanner.
    * @return the native pointer of the arrow::dataset::Scanner instance.
    */
@@ -80,6 +82,8 @@ public class JniWrapper {
       ByteBuffer substraitProjection,
       ByteBuffer substraitFilter,
       long batchSize,
+      int fileFormat,
+      String[] serializedFragmentScanOptions,
       long memoryPool);
 
   /**
diff --git 
a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java 
b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java
index 83a9ff1f32..8f8cdc49d4 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java
@@ -16,6 +16,7 @@
  */
 package org.apache.arrow.dataset.jni;
 
+import org.apache.arrow.dataset.scanner.FragmentScanOptions;
 import org.apache.arrow.dataset.scanner.ScanOptions;
 import org.apache.arrow.dataset.source.Dataset;
 
@@ -37,7 +38,13 @@ public class NativeDataset implements Dataset {
     if (closed) {
       throw new NativeInstanceReleasedException();
     }
-
+    int fileFormatId = -1;
+    String[] serialized = null;
+    if (options.getFragmentScanOptions().isPresent()) {
+      FragmentScanOptions fragmentScanOptions = 
options.getFragmentScanOptions().get();
+      fileFormatId = fragmentScanOptions.fileFormat().id();
+      serialized = fragmentScanOptions.serialize();
+    }
     long scannerId =
         JniWrapper.get()
             .createScanner(
@@ -46,6 +53,8 @@ public class NativeDataset implements Dataset {
                 options.getSubstraitProjection().orElse(null),
                 options.getSubstraitFilter().orElse(null),
                 options.getBatchSize(),
+                fileFormatId,
+                serialized,
                 context.getMemoryPool().getNativeInstanceId());
 
     return new NativeScanner(context, scannerId);
diff --git 
a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java
 
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java
new file mode 100644
index 0000000000..d48d0bd2b7
--- /dev/null
+++ 
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.dataset.scanner;
+
+import org.apache.arrow.dataset.file.FileFormat;
+
+/** The file fragment scan options interface. It is used to transfer to JNI 
call. */
+public interface FragmentScanOptions {
+  FileFormat fileFormat();
+
+  String[] serialize();
+}
diff --git 
a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java 
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java
index 837016ad1e..68fc3943b3 100644
--- 
a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java
+++ 
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java
@@ -27,6 +27,8 @@ public class ScanOptions {
   private final Optional<ByteBuffer> substraitProjection;
   private final Optional<ByteBuffer> substraitFilter;
 
+  private final Optional<FragmentScanOptions> fragmentScanOptions;
+
   /**
    * Constructor.
    *
@@ -65,6 +67,7 @@ public class ScanOptions {
     this.columns = columns;
     this.substraitProjection = Optional.empty();
     this.substraitFilter = Optional.empty();
+    this.fragmentScanOptions = Optional.empty();
   }
 
   public ScanOptions(long batchSize) {
@@ -87,12 +90,17 @@ public class ScanOptions {
     return substraitFilter;
   }
 
+  public Optional<FragmentScanOptions> getFragmentScanOptions() {
+    return fragmentScanOptions;
+  }
+
   /** Builder for Options used during scanning. */
   public static class Builder {
     private final long batchSize;
     private Optional<String[]> columns;
     private ByteBuffer substraitProjection;
     private ByteBuffer substraitFilter;
+    private FragmentScanOptions fragmentScanOptions;
 
     /**
      * Constructor.
@@ -140,6 +148,18 @@ public class ScanOptions {
       return this;
     }
 
+    /**
+     * Set the FragmentScanOptions.
+     *
+     * @param fragmentScanOptions fragment scan options
+     * @return the ScanOptions configured.
+     */
+    public Builder fragmentScanOptions(FragmentScanOptions 
fragmentScanOptions) {
+      Preconditions.checkNotNull(fragmentScanOptions);
+      this.fragmentScanOptions = fragmentScanOptions;
+      return this;
+    }
+
     public ScanOptions build() {
       return new ScanOptions(this);
     }
@@ -150,5 +170,6 @@ public class ScanOptions {
     columns = builder.columns;
     substraitProjection = Optional.ofNullable(builder.substraitProjection);
     substraitFilter = Optional.ofNullable(builder.substraitFilter);
+    fragmentScanOptions = Optional.ofNullable(builder.fragmentScanOptions);
   }
 }
diff --git 
a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java
 
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java
new file mode 100644
index 0000000000..15e257896b
--- /dev/null
+++ 
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.dataset.scanner.csv;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.arrow.c.ArrowSchema;
+
+public class CsvConvertOptions {
+
+  private final Map<String, String> configs;
+
+  private Optional<ArrowSchema> cSchema = Optional.empty();
+
+  public CsvConvertOptions(Map<String, String> configs) {
+    this.configs = configs;
+  }
+
+  public Optional<ArrowSchema> getArrowSchema() {
+    return cSchema;
+  }
+
+  public Map<String, String> getConfigs() {
+    return configs;
+  }
+
+  public void set(String key, String value) {
+    configs.put(key, value);
+  }
+
+  public void setArrowSchema(ArrowSchema cSchema) {
+    this.cSchema = Optional.of(cSchema);
+  }
+}
diff --git 
a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java
 
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java
new file mode 100644
index 0000000000..39271b5f06
--- /dev/null
+++ 
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.dataset.scanner.csv;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.scanner.FragmentScanOptions;
+import org.apache.arrow.dataset.utils.MapUtil;
+
+public class CsvFragmentScanOptions implements FragmentScanOptions {
+  private final CsvConvertOptions convertOptions;
+  private final Map<String, String> readOptions;
+  private final Map<String, String> parseOptions;
+
+  /**
+   * CSV scan options, map to CPP struct CsvFragmentScanOptions. The key in 
config map is the field
+   * name of mapping cpp struct
+   *
+   * @param convertOptions similar to CsvFragmentScanOptions#convert_options 
in CPP, the ArrowSchema
+   *     represents column_types, convert data option such as null value 
recognition.
+   * @param readOptions similar to CsvFragmentScanOptions#read_options in CPP, 
specify how to read
+   *     the file such as block_size
+   * @param parseOptions similar to CsvFragmentScanOptions#parse_options in 
CPP, parse file option
+   *     such as delimiter
+   */
+  public CsvFragmentScanOptions(
+      CsvConvertOptions convertOptions,
+      Map<String, String> readOptions,
+      Map<String, String> parseOptions) {
+    this.convertOptions = convertOptions;
+    this.readOptions = readOptions;
+    this.parseOptions = parseOptions;
+  }
+
+  /**
+   * File format.
+   *
+   * @return file format.
+   */
+  @Override
+  public FileFormat fileFormat() {
+    return FileFormat.CSV;
+  }
+
+  /**
+   * This is an internal function to invoke by serializer. Serialize this 
class to string array and
+   * then called by JNI call.
+   *
+   * @return string array as Map JNI bridge format.
+   */
+  @Override
+  public String[] serialize() {
+    Map<String, String> options =
+        Stream.concat(
+                Stream.concat(readOptions.entrySet().stream(), 
parseOptions.entrySet().stream()),
+                convertOptions.getConfigs().entrySet().stream())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    if (convertOptions.getArrowSchema().isPresent()) {
+      options.put(
+          "column_types", 
Long.toString(convertOptions.getArrowSchema().get().memoryAddress()));
+    }
+    return MapUtil.convertMapToStringArray(options);
+  }
+
+  public CsvConvertOptions getConvertOptions() {
+    return convertOptions;
+  }
+
+  public Map<String, String> getReadOptions() {
+    return readOptions;
+  }
+
+  public Map<String, String> getParseOptions() {
+    return parseOptions;
+  }
+}
diff --git 
a/java/dataset/src/main/java/org/apache/arrow/dataset/utils/MapUtil.java 
b/java/dataset/src/main/java/org/apache/arrow/dataset/utils/MapUtil.java
new file mode 100644
index 0000000000..4df6cf1e0e
--- /dev/null
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/utils/MapUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.dataset.utils;
+
+import java.util.Map;
+
+/** The utility class for Map. */
+public class MapUtil {
+  private MapUtil() {}
+
+  /**
+   * Convert the map to string array as JNI bridge.
+   *
+   * @param config config map
+   * @return string array for serialization
+   */
+  public static String[] convertMapToStringArray(Map<String, String> config) {
+    if (config.isEmpty()) {
+      return null;
+    }
+    String[] configs = new String[config.size() * 2];
+    int i = 0;
+    for (Map.Entry<String, String> entry : config.entrySet()) {
+      configs[i++] = entry.getKey();
+      configs[i++] = entry.getValue();
+    }
+    return configs;
+  }
+}
diff --git 
a/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java
 
b/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java
new file mode 100644
index 0000000000..9787e8308e
--- /dev/null
+++ 
b/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.dataset;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.CDataDictionaryProvider;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.scanner.csv.CsvConvertOptions;
+import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ValueIterableVector;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.junit.jupiter.api.Test;
+
+public class TestFragmentScanOptions {
+
+  @Test
+  public void testCsvConvertOptions() throws Exception {
+    final Schema schema =
+        new Schema(
+            Arrays.asList(
+                Field.nullable("Id", new ArrowType.Int(32, true)),
+                Field.nullable("Name", new ArrowType.Utf8()),
+                Field.nullable("Language", new ArrowType.Utf8())),
+            null);
+    String path = "file://" + getClass().getResource("/").getPath() + 
"/data/student.csv";
+    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+    try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator);
+        CDataDictionaryProvider provider = new CDataDictionaryProvider()) {
+      Data.exportSchema(allocator, schema, provider, cSchema);
+      CsvConvertOptions convertOptions = new 
CsvConvertOptions(ImmutableMap.of("delimiter", ";"));
+      convertOptions.setArrowSchema(cSchema);
+      CsvFragmentScanOptions fragmentScanOptions =
+          new CsvFragmentScanOptions(convertOptions, ImmutableMap.of(), 
ImmutableMap.of());
+      ScanOptions options =
+          new ScanOptions.Builder(/*batchSize*/ 32768)
+              .columns(Optional.empty())
+              .fragmentScanOptions(fragmentScanOptions)
+              .build();
+      try (DatasetFactory datasetFactory =
+              new FileSystemDatasetFactory(
+                  allocator, NativeMemoryPool.getDefault(), FileFormat.CSV, 
path);
+          Dataset dataset = datasetFactory.finish();
+          Scanner scanner = dataset.newScan(options);
+          ArrowReader reader = scanner.scanBatches()) {
+
+        assertEquals(schema.getFields(), 
reader.getVectorSchemaRoot().getSchema().getFields());
+        int rowCount = 0;
+        while (reader.loadNextBatch()) {
+          final ValueIterableVector<Integer> idVector =
+              (ValueIterableVector<Integer>) 
reader.getVectorSchemaRoot().getVector("Id");
+          assertThat(idVector.getValueIterable(), 
IsIterableContainingInOrder.contains(1, 2, 3));
+          rowCount += reader.getVectorSchemaRoot().getRowCount();
+        }
+        assertEquals(3, rowCount);
+      }
+    }
+  }
+
+  @Test
+  public void testCsvConvertOptionsDelimiterNotSet() throws Exception {
+    final Schema schema =
+        new Schema(
+            Arrays.asList(
+                Field.nullable("Id", new ArrowType.Int(32, true)),
+                Field.nullable("Name", new ArrowType.Utf8()),
+                Field.nullable("Language", new ArrowType.Utf8())),
+            null);
+    String path = "file://" + getClass().getResource("/").getPath() + 
"/data/student.csv";
+    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+    try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator);
+        CDataDictionaryProvider provider = new CDataDictionaryProvider()) {
+      Data.exportSchema(allocator, schema, provider, cSchema);
+      CsvConvertOptions convertOptions = new 
CsvConvertOptions(ImmutableMap.of());
+      convertOptions.setArrowSchema(cSchema);
+      CsvFragmentScanOptions fragmentScanOptions =
+          new CsvFragmentScanOptions(convertOptions, ImmutableMap.of(), 
ImmutableMap.of());
+      ScanOptions options =
+          new ScanOptions.Builder(/*batchSize*/ 32768)
+              .columns(Optional.empty())
+              .fragmentScanOptions(fragmentScanOptions)
+              .build();
+      try (DatasetFactory datasetFactory =
+              new FileSystemDatasetFactory(
+                  allocator, NativeMemoryPool.getDefault(), FileFormat.CSV, 
path);
+          Dataset dataset = datasetFactory.finish();
+          Scanner scanner = dataset.newScan(options);
+          ArrowReader reader = scanner.scanBatches()) {
+
+        assertEquals(schema.getFields(), 
reader.getVectorSchemaRoot().getSchema().getFields());
+        int rowCount = 0;
+        while (reader.loadNextBatch()) {
+          final ValueIterableVector<Integer> idVector =
+              (ValueIterableVector<Integer>) 
reader.getVectorSchemaRoot().getVector("Id");
+          assertThat(idVector.getValueIterable(), 
IsIterableContainingInOrder.contains(1, 2, 3));
+          rowCount += reader.getVectorSchemaRoot().getRowCount();
+        }
+        assertEquals(3, rowCount);
+      }
+    }
+  }
+
+  @Test
+  public void testCsvConvertOptionsNoOption() throws Exception {
+    final Schema schema =
+        new Schema(
+            Collections.singletonList(Field.nullable("Id;Name;Language", new 
ArrowType.Utf8())),
+            null);
+    String path = "file://" + getClass().getResource("/").getPath() + 
"/data/student.csv";
+    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+    ScanOptions options =
+        new ScanOptions.Builder(/*batchSize*/ 
32768).columns(Optional.empty()).build();
+    try (DatasetFactory datasetFactory =
+            new FileSystemDatasetFactory(
+                allocator, NativeMemoryPool.getDefault(), FileFormat.CSV, 
path);
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()) {
+
+      assertEquals(schema.getFields(), 
reader.getVectorSchemaRoot().getSchema().getFields());
+      int rowCount = 0;
+      while (reader.loadNextBatch()) {
+        final ValueIterableVector<String> idVector =
+            (ValueIterableVector<String>)
+                reader.getVectorSchemaRoot().getVector("Id;Name;Language");
+        assertThat(
+            idVector.getValueIterable(),
+            IsIterableContainingInOrder.contains(
+                "1;Juno;Java\n" + "2;Peter;Python\n" + "3;Celin;C++"));
+        rowCount += reader.getVectorSchemaRoot().getRowCount();
+      }
+      assertEquals(3, rowCount);
+    }
+  }
+}
diff --git a/java/dataset/src/test/resources/data/student.csv 
b/java/dataset/src/test/resources/data/student.csv
new file mode 100644
index 0000000000..3291946092
--- /dev/null
+++ b/java/dataset/src/test/resources/data/student.csv
@@ -0,0 +1,4 @@
+Id;Name;Language
+1;Juno;Java
+2;Peter;Python
+3;Celin;C++


Reply via email to