This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new fd69e5ef9f GH-28866: [Java] Java Dataset API ScanOptions expansion
(#41646)
fd69e5ef9f is described below
commit fd69e5ef9f2d60e3b7bc7fc80208e04994834b9e
Author: Jin Chengcheng <[email protected]>
AuthorDate: Tue Jul 30 13:58:41 2024 +0800
GH-28866: [Java] Java Dataset API ScanOptions expansion (#41646)
### Rationale for this change
### What changes are included in this PR?
Support to add ArrowSchema to specify C++
CsvFragmentScanOptions.convert_options.column_types
And use Map to set the config, serialize in java and deserialize in C++ for
CsvFragmentScanOptions
### Are these changes tested?
new added UT.
### Are there any user-facing changes?
No.
* GitHub Issue: #28866
Authored-by: Chengcheng Jin <[email protected]>
Signed-off-by: David Li <[email protected]>
---
java/dataset/src/main/cpp/jni_wrapper.cc | 117 ++++++++++++--
.../dataset/file/FileSystemDatasetFactory.java | 40 ++++-
.../org/apache/arrow/dataset/file/JniWrapper.java | 12 +-
.../org/apache/arrow/dataset/jni/JniWrapper.java | 4 +
.../apache/arrow/dataset/jni/NativeDataset.java | 11 +-
.../arrow/dataset/scanner/FragmentScanOptions.java | 26 ++++
.../apache/arrow/dataset/scanner/ScanOptions.java | 21 +++
.../dataset/scanner/csv/CsvConvertOptions.java | 48 ++++++
.../scanner/csv/CsvFragmentScanOptions.java | 93 ++++++++++++
.../org/apache/arrow/dataset/utils/MapUtil.java | 43 ++++++
.../arrow/dataset/TestFragmentScanOptions.java | 168 +++++++++++++++++++++
java/dataset/src/test/resources/data/student.csv | 4 +
12 files changed, 566 insertions(+), 21 deletions(-)
diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc
b/java/dataset/src/main/cpp/jni_wrapper.cc
index 4ef2a2ffd9..f324f87d6c 100644
--- a/java/dataset/src/main/cpp/jni_wrapper.cc
+++ b/java/dataset/src/main/cpp/jni_wrapper.cc
@@ -25,6 +25,9 @@
#include "arrow/c/helpers.h"
#include "arrow/dataset/api.h"
#include "arrow/dataset/file_base.h"
+#ifdef ARROW_CSV
+#include "arrow/dataset/file_csv.h"
+#endif
#include "arrow/filesystem/api.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/engine/substrait/util.h"
@@ -363,6 +366,63 @@ std::shared_ptr<arrow::Buffer>
LoadArrowBufferFromByteBuffer(JNIEnv* env, jobjec
return buffer;
}
+inline bool ParseBool(const std::string& value) { return value == "true" ?
true : false; }
+
+/// \brief Construct FragmentScanOptions from config map
+#ifdef ARROW_CSV
+arrow::Result<std::shared_ptr<arrow::dataset::FragmentScanOptions>>
+ToCsvFragmentScanOptions(const std::unordered_map<std::string, std::string>&
configs) {
+ std::shared_ptr<arrow::dataset::CsvFragmentScanOptions> options =
+ std::make_shared<arrow::dataset::CsvFragmentScanOptions>();
+ for (auto const& [key, value] : configs) {
+ if (key == "delimiter") {
+ options->parse_options.delimiter = value.data()[0];
+ } else if (key == "quoting") {
+ options->parse_options.quoting = ParseBool(value);
+ } else if (key == "column_types") {
+ int64_t schema_address = std::stol(value);
+ ArrowSchema* c_schema = reinterpret_cast<ArrowSchema*>(schema_address);
+ ARROW_ASSIGN_OR_RAISE(auto schema, arrow::ImportSchema(c_schema));
+ auto& column_types = options->convert_options.column_types;
+ for (auto field : schema->fields()) {
+ column_types[field->name()] = field->type();
+ }
+ } else if (key == "strings_can_be_null") {
+ options->convert_options.strings_can_be_null = ParseBool(value);
+ } else {
+ return arrow::Status::Invalid("Config " + key + " is not supported.");
+ }
+ }
+ return options;
+}
+#endif
+
+arrow::Result<std::shared_ptr<arrow::dataset::FragmentScanOptions>>
+GetFragmentScanOptions(jint file_format_id,
+ const std::unordered_map<std::string, std::string>&
configs) {
+ switch (file_format_id) {
+#ifdef ARROW_CSV
+ case 3:
+ return ToCsvFragmentScanOptions(configs);
+#endif
+ default:
+ return arrow::Status::Invalid("Illegal file format id: ",
file_format_id);
+ }
+}
+
+std::unordered_map<std::string, std::string> ToStringMap(JNIEnv* env,
+ jobjectArray&
str_array) {
+ int length = env->GetArrayLength(str_array);
+ std::unordered_map<std::string, std::string> map;
+ map.reserve(length / 2);
+ for (int i = 0; i < length; i += 2) {
+ auto key = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array,
i));
+ auto value =
reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, i + 1));
+ map[JStringToCString(env, key)] = JStringToCString(env, value);
+ }
+ return map;
+}
+
/*
* Class: org_apache_arrow_dataset_jni_NativeMemoryPool
* Method: getDefaultMemoryPool
@@ -501,12 +561,13 @@ JNIEXPORT void JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_closeDataset
/*
* Class: org_apache_arrow_dataset_jni_JniWrapper
* Method: createScanner
- * Signature:
(J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JJ)J
+ * Signature:
+ *
(J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JI;[Ljava/lang/String;J)J
*/
JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner(
JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns,
- jobject substrait_projection, jobject substrait_filter,
- jlong batch_size, jlong memory_pool_id) {
+ jobject substrait_projection, jobject substrait_filter, jlong batch_size,
+ jint file_format_id, jobjectArray options, jlong memory_pool_id) {
JNI_METHOD_START
arrow::MemoryPool* pool =
reinterpret_cast<arrow::MemoryPool*>(memory_pool_id);
if (pool == nullptr) {
@@ -555,6 +616,12 @@ JNIEXPORT jlong JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_createScann
}
JniAssertOkOrThrow(scanner_builder->Filter(*filter_expr));
}
+ if (file_format_id != -1 && options != nullptr) {
+ std::unordered_map<std::string, std::string> option_map = ToStringMap(env,
options);
+ std::shared_ptr<arrow::dataset::FragmentScanOptions> scan_options =
+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map));
+ JniAssertOkOrThrow(scanner_builder->FragmentScanOptions(scan_options));
+ }
JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size));
auto scanner = JniGetOrThrow(scanner_builder->Finish());
@@ -668,14 +735,29 @@ JNIEXPORT void JNICALL
Java_org_apache_arrow_dataset_jni_JniWrapper_ensureS3Fina
/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: makeFileSystemDatasetFactory
- * Signature: (Ljava/lang/String;II)J
+ * Signature: (Ljava/lang/String;II;Ljava/lang/String;Ljava/lang/String)J
*/
JNIEXPORT jlong JNICALL
-Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2I(
- JNIEnv* env, jobject, jstring uri, jint file_format_id) {
+Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
+ JNIEnv* env, jobject, jstring uri, jint file_format_id, jobjectArray
options) {
JNI_METHOD_START
std::shared_ptr<arrow::dataset::FileFormat> file_format =
JniGetOrThrow(GetFileFormat(file_format_id));
+ if (options != nullptr) {
+ std::unordered_map<std::string, std::string> option_map = ToStringMap(env,
options);
+ std::shared_ptr<arrow::dataset::FragmentScanOptions> scan_options =
+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map));
+ file_format->default_fragment_scan_options = scan_options;
+#ifdef ARROW_CSV
+ if (file_format_id == 3) {
+ std::shared_ptr<arrow::dataset::CsvFileFormat> csv_file_format =
+
std::dynamic_pointer_cast<arrow::dataset::CsvFileFormat>(file_format);
+ csv_file_format->parse_options =
+
std::dynamic_pointer_cast<arrow::dataset::CsvFragmentScanOptions>(scan_options)
+ ->parse_options;
+ }
+#endif
+ }
arrow::dataset::FileSystemFactoryOptions options;
std::shared_ptr<arrow::dataset::DatasetFactory> d =
JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make(
@@ -686,16 +768,31 @@
Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljav
/*
* Class: org_apache_arrow_dataset_file_JniWrapper
- * Method: makeFileSystemDatasetFactory
- * Signature: ([Ljava/lang/String;II)J
+ * Method: makeFileSystemDatasetFactoryWithFiles
+ * Signature: ([Ljava/lang/String;II;[Ljava/lang/String)J
*/
JNIEXPORT jlong JNICALL
-Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Ljava_lang_String_2I(
- JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) {
+Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactoryWithFiles(
+ JNIEnv* env, jobject, jobjectArray uris, jint file_format_id, jobjectArray
options) {
JNI_METHOD_START
std::shared_ptr<arrow::dataset::FileFormat> file_format =
JniGetOrThrow(GetFileFormat(file_format_id));
+ if (options != nullptr) {
+ std::unordered_map<std::string, std::string> option_map = ToStringMap(env,
options);
+ std::shared_ptr<arrow::dataset::FragmentScanOptions> scan_options =
+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map));
+ file_format->default_fragment_scan_options = scan_options;
+#ifdef ARROW_CSV
+ if (file_format_id == 3) {
+ std::shared_ptr<arrow::dataset::CsvFileFormat> csv_file_format =
+
std::dynamic_pointer_cast<arrow::dataset::CsvFileFormat>(file_format);
+ csv_file_format->parse_options =
+
std::dynamic_pointer_cast<arrow::dataset::CsvFragmentScanOptions>(scan_options)
+ ->parse_options;
+ }
+#endif
+ }
arrow::dataset::FileSystemFactoryOptions options;
std::vector<std::string> uri_vec = ToStringVector(env, uris);
diff --git
a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java
b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java
index 36ac6288af..fcf124a61f 100644
---
a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java
+++
b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java
@@ -16,8 +16,10 @@
*/
package org.apache.arrow.dataset.file;
+import java.util.Optional;
import org.apache.arrow.dataset.jni.NativeDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.FragmentScanOptions;
import org.apache.arrow.memory.BufferAllocator;
/** Java binding of the C++ FileSystemDatasetFactory. */
@@ -25,19 +27,45 @@ public class FileSystemDatasetFactory extends
NativeDatasetFactory {
public FileSystemDatasetFactory(
BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat
format, String uri) {
- super(allocator, memoryPool, createNative(format, uri));
+ super(allocator, memoryPool, createNative(format, uri, Optional.empty()));
+ }
+
+ public FileSystemDatasetFactory(
+ BufferAllocator allocator,
+ NativeMemoryPool memoryPool,
+ FileFormat format,
+ String uri,
+ Optional<FragmentScanOptions> fragmentScanOptions) {
+ super(allocator, memoryPool, createNative(format, uri,
fragmentScanOptions));
}
public FileSystemDatasetFactory(
BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat
format, String[] uris) {
- super(allocator, memoryPool, createNative(format, uris));
+ super(allocator, memoryPool, createNative(format, uris, Optional.empty()));
+ }
+
+ public FileSystemDatasetFactory(
+ BufferAllocator allocator,
+ NativeMemoryPool memoryPool,
+ FileFormat format,
+ String[] uris,
+ Optional<FragmentScanOptions> fragmentScanOptions) {
+ super(allocator, memoryPool, createNative(format, uris,
fragmentScanOptions));
}
- private static long createNative(FileFormat format, String uri) {
- return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id());
+ private static long createNative(
+ FileFormat format, String uri, Optional<FragmentScanOptions>
fragmentScanOptions) {
+ return JniWrapper.get()
+ .makeFileSystemDatasetFactory(
+ uri, format.id(),
fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null));
}
- private static long createNative(FileFormat format, String[] uris) {
- return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id());
+ private static long createNative(
+ FileFormat format, String[] uris, Optional<FragmentScanOptions>
fragmentScanOptions) {
+ return JniWrapper.get()
+ .makeFileSystemDatasetFactoryWithFiles(
+ uris,
+ format.id(),
+
fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null));
}
}
diff --git
a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
index dfac293ccb..d2f842f99e 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
@@ -37,22 +37,26 @@ public class JniWrapper {
* intermediate shared_ptr of the factory instance.
*
* @param uri file uri to read, either a file or a directory
- * @param fileFormat file format ID
+ * @param fileFormat file format ID.
+ * @param serializedFragmentScanOptions serialized FragmentScanOptions.
* @return the native pointer of the
arrow::dataset::FileSystemDatasetFactory instance.
* @see FileFormat
*/
- public native long makeFileSystemDatasetFactory(String uri, int fileFormat);
+ public native long makeFileSystemDatasetFactory(
+ String uri, int fileFormat, String[] serializedFragmentScanOptions);
/**
* Create FileSystemDatasetFactory and return its native pointer. The
pointer is pointing to a
* intermediate shared_ptr of the factory instance.
*
* @param uris List of file uris to read, each path pointing to an
individual file
- * @param fileFormat file format ID
+ * @param fileFormat file format ID.
+ * @param serializedFragmentScanOptions serialized FragmentScanOptions.
* @return the native pointer of the
arrow::dataset::FileSystemDatasetFactory instance.
* @see FileFormat
*/
- public native long makeFileSystemDatasetFactory(String[] uris, int
fileFormat);
+ public native long makeFileSystemDatasetFactoryWithFiles(
+ String[] uris, int fileFormat, String[] serializedFragmentScanOptions);
/**
* Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into
files. This internally
diff --git
a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
index b5aa3d918a..6637c113d9 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
@@ -71,6 +71,8 @@ public class JniWrapper {
* @param substraitProjection substrait extended expression to evaluate for
project new columns
* @param substraitFilter substrait extended expression to evaluate for
apply filter
* @param batchSize batch size of scanned record batches.
+ * @param fileFormat file format ID.
+ * @param serializedFragmentScanOptions serialized FragmentScanOptions.
* @param memoryPool identifier of memory pool used in the native scanner.
* @return the native pointer of the arrow::dataset::Scanner instance.
*/
@@ -80,6 +82,8 @@ public class JniWrapper {
ByteBuffer substraitProjection,
ByteBuffer substraitFilter,
long batchSize,
+ int fileFormat,
+ String[] serializedFragmentScanOptions,
long memoryPool);
/**
diff --git
a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java
b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java
index 83a9ff1f32..8f8cdc49d4 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java
@@ -16,6 +16,7 @@
*/
package org.apache.arrow.dataset.jni;
+import org.apache.arrow.dataset.scanner.FragmentScanOptions;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.source.Dataset;
@@ -37,7 +38,13 @@ public class NativeDataset implements Dataset {
if (closed) {
throw new NativeInstanceReleasedException();
}
-
+ int fileFormatId = -1;
+ String[] serialized = null;
+ if (options.getFragmentScanOptions().isPresent()) {
+ FragmentScanOptions fragmentScanOptions =
options.getFragmentScanOptions().get();
+ fileFormatId = fragmentScanOptions.fileFormat().id();
+ serialized = fragmentScanOptions.serialize();
+ }
long scannerId =
JniWrapper.get()
.createScanner(
@@ -46,6 +53,8 @@ public class NativeDataset implements Dataset {
options.getSubstraitProjection().orElse(null),
options.getSubstraitFilter().orElse(null),
options.getBatchSize(),
+ fileFormatId,
+ serialized,
context.getMemoryPool().getNativeInstanceId());
return new NativeScanner(context, scannerId);
diff --git
a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java
new file mode 100644
index 0000000000..d48d0bd2b7
--- /dev/null
+++
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.dataset.scanner;
+
+import org.apache.arrow.dataset.file.FileFormat;
+
+/** The file fragment scan options interface. It is used to transfer to JNI
call. */
+public interface FragmentScanOptions {
+ FileFormat fileFormat();
+
+ String[] serialize();
+}
diff --git
a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java
index 837016ad1e..68fc3943b3 100644
---
a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java
+++
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java
@@ -27,6 +27,8 @@ public class ScanOptions {
private final Optional<ByteBuffer> substraitProjection;
private final Optional<ByteBuffer> substraitFilter;
+ private final Optional<FragmentScanOptions> fragmentScanOptions;
+
/**
* Constructor.
*
@@ -65,6 +67,7 @@ public class ScanOptions {
this.columns = columns;
this.substraitProjection = Optional.empty();
this.substraitFilter = Optional.empty();
+ this.fragmentScanOptions = Optional.empty();
}
public ScanOptions(long batchSize) {
@@ -87,12 +90,17 @@ public class ScanOptions {
return substraitFilter;
}
+ public Optional<FragmentScanOptions> getFragmentScanOptions() {
+ return fragmentScanOptions;
+ }
+
/** Builder for Options used during scanning. */
public static class Builder {
private final long batchSize;
private Optional<String[]> columns;
private ByteBuffer substraitProjection;
private ByteBuffer substraitFilter;
+ private FragmentScanOptions fragmentScanOptions;
/**
* Constructor.
@@ -140,6 +148,18 @@ public class ScanOptions {
return this;
}
+ /**
+ * Set the FragmentScanOptions.
+ *
+ * @param fragmentScanOptions fragment scan options
+ * @return the ScanOptions configured.
+ */
+ public Builder fragmentScanOptions(FragmentScanOptions
fragmentScanOptions) {
+ Preconditions.checkNotNull(fragmentScanOptions);
+ this.fragmentScanOptions = fragmentScanOptions;
+ return this;
+ }
+
public ScanOptions build() {
return new ScanOptions(this);
}
@@ -150,5 +170,6 @@ public class ScanOptions {
columns = builder.columns;
substraitProjection = Optional.ofNullable(builder.substraitProjection);
substraitFilter = Optional.ofNullable(builder.substraitFilter);
+ fragmentScanOptions = Optional.ofNullable(builder.fragmentScanOptions);
}
}
diff --git
a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java
new file mode 100644
index 0000000000..15e257896b
--- /dev/null
+++
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.dataset.scanner.csv;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.arrow.c.ArrowSchema;
+
+public class CsvConvertOptions {
+
+ private final Map<String, String> configs;
+
+ private Optional<ArrowSchema> cSchema = Optional.empty();
+
+ public CsvConvertOptions(Map<String, String> configs) {
+ this.configs = configs;
+ }
+
+ public Optional<ArrowSchema> getArrowSchema() {
+ return cSchema;
+ }
+
+ public Map<String, String> getConfigs() {
+ return configs;
+ }
+
+ public void set(String key, String value) {
+ configs.put(key, value);
+ }
+
+ public void setArrowSchema(ArrowSchema cSchema) {
+ this.cSchema = Optional.of(cSchema);
+ }
+}
diff --git
a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java
new file mode 100644
index 0000000000..39271b5f06
--- /dev/null
+++
b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.dataset.scanner.csv;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.scanner.FragmentScanOptions;
+import org.apache.arrow.dataset.utils.MapUtil;
+
+public class CsvFragmentScanOptions implements FragmentScanOptions {
+ private final CsvConvertOptions convertOptions;
+ private final Map<String, String> readOptions;
+ private final Map<String, String> parseOptions;
+
+ /**
+ * CSV scan options, map to CPP struct CsvFragmentScanOptions. The key in
config map is the field
+ * name of mapping cpp struct
+ *
+ * @param convertOptions similar to CsvFragmentScanOptions#convert_options
in CPP, the ArrowSchema
+ * represents column_types, convert data option such as null value
recognition.
+ * @param readOptions similar to CsvFragmentScanOptions#read_options in CPP,
specify how to read
+ * the file such as block_size
+ * @param parseOptions similar to CsvFragmentScanOptions#parse_options in
CPP, parse file option
+ * such as delimiter
+ */
+ public CsvFragmentScanOptions(
+ CsvConvertOptions convertOptions,
+ Map<String, String> readOptions,
+ Map<String, String> parseOptions) {
+ this.convertOptions = convertOptions;
+ this.readOptions = readOptions;
+ this.parseOptions = parseOptions;
+ }
+
+ /**
+ * File format.
+ *
+ * @return file format.
+ */
+ @Override
+ public FileFormat fileFormat() {
+ return FileFormat.CSV;
+ }
+
+ /**
+ * This is an internal function to invoke by serializer. Serialize this
class to string array and
+ * then called by JNI call.
+ *
+ * @return string array as Map JNI bridge format.
+ */
+ @Override
+ public String[] serialize() {
+ Map<String, String> options =
+ Stream.concat(
+ Stream.concat(readOptions.entrySet().stream(),
parseOptions.entrySet().stream()),
+ convertOptions.getConfigs().entrySet().stream())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ if (convertOptions.getArrowSchema().isPresent()) {
+ options.put(
+ "column_types",
Long.toString(convertOptions.getArrowSchema().get().memoryAddress()));
+ }
+ return MapUtil.convertMapToStringArray(options);
+ }
+
+ public CsvConvertOptions getConvertOptions() {
+ return convertOptions;
+ }
+
+ public Map<String, String> getReadOptions() {
+ return readOptions;
+ }
+
+ public Map<String, String> getParseOptions() {
+ return parseOptions;
+ }
+}
diff --git
a/java/dataset/src/main/java/org/apache/arrow/dataset/utils/MapUtil.java
b/java/dataset/src/main/java/org/apache/arrow/dataset/utils/MapUtil.java
new file mode 100644
index 0000000000..4df6cf1e0e
--- /dev/null
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/utils/MapUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.dataset.utils;
+
+import java.util.Map;
+
+/** The utility class for Map. */
+public class MapUtil {
+ private MapUtil() {}
+
+ /**
+ * Convert the map to string array as JNI bridge.
+ *
+ * @param config config map
+ * @return string array for serialization
+ */
+ public static String[] convertMapToStringArray(Map<String, String> config) {
+ if (config.isEmpty()) {
+ return null;
+ }
+ String[] configs = new String[config.size() * 2];
+ int i = 0;
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ configs[i++] = entry.getKey();
+ configs[i++] = entry.getValue();
+ }
+ return configs;
+ }
+}
diff --git
a/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java
b/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java
new file mode 100644
index 0000000000..9787e8308e
--- /dev/null
+++
b/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.dataset;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.arrow.c.ArrowSchema;
+import org.apache.arrow.c.CDataDictionaryProvider;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.scanner.csv.CsvConvertOptions;
+import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ValueIterableVector;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.hamcrest.collection.IsIterableContainingInOrder;
+import org.junit.jupiter.api.Test;
+
+public class TestFragmentScanOptions {
+
+ @Test
+ public void testCsvConvertOptions() throws Exception {
+ final Schema schema =
+ new Schema(
+ Arrays.asList(
+ Field.nullable("Id", new ArrowType.Int(32, true)),
+ Field.nullable("Name", new ArrowType.Utf8()),
+ Field.nullable("Language", new ArrowType.Utf8())),
+ null);
+ String path = "file://" + getClass().getResource("/").getPath() +
"/data/student.csv";
+ BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator);
+ CDataDictionaryProvider provider = new CDataDictionaryProvider()) {
+ Data.exportSchema(allocator, schema, provider, cSchema);
+ CsvConvertOptions convertOptions = new
CsvConvertOptions(ImmutableMap.of("delimiter", ";"));
+ convertOptions.setArrowSchema(cSchema);
+ CsvFragmentScanOptions fragmentScanOptions =
+ new CsvFragmentScanOptions(convertOptions, ImmutableMap.of(),
ImmutableMap.of());
+ ScanOptions options =
+ new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .fragmentScanOptions(fragmentScanOptions)
+ .build();
+ try (DatasetFactory datasetFactory =
+ new FileSystemDatasetFactory(
+ allocator, NativeMemoryPool.getDefault(), FileFormat.CSV,
path);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()) {
+
+ assertEquals(schema.getFields(),
reader.getVectorSchemaRoot().getSchema().getFields());
+ int rowCount = 0;
+ while (reader.loadNextBatch()) {
+ final ValueIterableVector<Integer> idVector =
+ (ValueIterableVector<Integer>)
reader.getVectorSchemaRoot().getVector("Id");
+ assertThat(idVector.getValueIterable(),
IsIterableContainingInOrder.contains(1, 2, 3));
+ rowCount += reader.getVectorSchemaRoot().getRowCount();
+ }
+ assertEquals(3, rowCount);
+ }
+ }
+ }
+
+ @Test
+ public void testCsvConvertOptionsDelimiterNotSet() throws Exception {
+ final Schema schema =
+ new Schema(
+ Arrays.asList(
+ Field.nullable("Id", new ArrowType.Int(32, true)),
+ Field.nullable("Name", new ArrowType.Utf8()),
+ Field.nullable("Language", new ArrowType.Utf8())),
+ null);
+ String path = "file://" + getClass().getResource("/").getPath() +
"/data/student.csv";
+ BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator);
+ CDataDictionaryProvider provider = new CDataDictionaryProvider()) {
+ Data.exportSchema(allocator, schema, provider, cSchema);
+ CsvConvertOptions convertOptions = new
CsvConvertOptions(ImmutableMap.of());
+ convertOptions.setArrowSchema(cSchema);
+ CsvFragmentScanOptions fragmentScanOptions =
+ new CsvFragmentScanOptions(convertOptions, ImmutableMap.of(),
ImmutableMap.of());
+ ScanOptions options =
+ new ScanOptions.Builder(/*batchSize*/ 32768)
+ .columns(Optional.empty())
+ .fragmentScanOptions(fragmentScanOptions)
+ .build();
+ try (DatasetFactory datasetFactory =
+ new FileSystemDatasetFactory(
+ allocator, NativeMemoryPool.getDefault(), FileFormat.CSV,
path);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()) {
+
+ assertEquals(schema.getFields(),
reader.getVectorSchemaRoot().getSchema().getFields());
+ int rowCount = 0;
+ while (reader.loadNextBatch()) {
+ final ValueIterableVector<Integer> idVector =
+ (ValueIterableVector<Integer>)
reader.getVectorSchemaRoot().getVector("Id");
+ assertThat(idVector.getValueIterable(),
IsIterableContainingInOrder.contains(1, 2, 3));
+ rowCount += reader.getVectorSchemaRoot().getRowCount();
+ }
+ assertEquals(3, rowCount);
+ }
+ }
+ }
+
+ @Test
+ public void testCsvConvertOptionsNoOption() throws Exception {
+ final Schema schema =
+ new Schema(
+ Collections.singletonList(Field.nullable("Id;Name;Language", new
ArrowType.Utf8())),
+ null);
+ String path = "file://" + getClass().getResource("/").getPath() +
"/data/student.csv";
+ BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ ScanOptions options =
+ new ScanOptions.Builder(/*batchSize*/
32768).columns(Optional.empty()).build();
+ try (DatasetFactory datasetFactory =
+ new FileSystemDatasetFactory(
+ allocator, NativeMemoryPool.getDefault(), FileFormat.CSV,
path);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()) {
+
+ assertEquals(schema.getFields(),
reader.getVectorSchemaRoot().getSchema().getFields());
+ int rowCount = 0;
+ while (reader.loadNextBatch()) {
+ final ValueIterableVector<String> idVector =
+ (ValueIterableVector<String>)
+ reader.getVectorSchemaRoot().getVector("Id;Name;Language");
+ assertThat(
+ idVector.getValueIterable(),
+ IsIterableContainingInOrder.contains(
+ "1;Juno;Java\n" + "2;Peter;Python\n" + "3;Celin;C++"));
+ rowCount += reader.getVectorSchemaRoot().getRowCount();
+ }
+ assertEquals(3, rowCount);
+ }
+ }
+}
diff --git a/java/dataset/src/test/resources/data/student.csv
b/java/dataset/src/test/resources/data/student.csv
new file mode 100644
index 0000000000..3291946092
--- /dev/null
+++ b/java/dataset/src/test/resources/data/student.csv
@@ -0,0 +1,4 @@
+Id;Name;Language
+1;Juno;Java
+2;Peter;Python
+3;Celin;C++