This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 78351cec5a ARROW-17303: [Java][Dataset] Read Arrow IPC files by 
NativeDatasetFactory (#13760) (#13811)
78351cec5a is described below

commit 78351cec5afe97f94050118e0fdeaa14f385178c
Author: Igor Suhorukov <[email protected]>
AuthorDate: Mon Aug 8 22:45:28 2022 +0300

    ARROW-17303: [Java][Dataset] Read Arrow IPC files by NativeDatasetFactory 
(#13760) (#13811)
    
    This PR allow developers to create Dataset from ARROW IPC files in JVM code 
like:
    `FileSystemDatasetFactory factory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
                FileFormat.ARROW_IPC, arrowDatasetURL);`
    
    It is foundation for Apache Spark arrow data source to process huge 
existing partitioned datasets in ARROW file format without additional data 
format conversion
    
    Lead-authored-by: Igor Suhorukov <[email protected]>
    Co-authored-by: igor.suhorukov <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 java/dataset/src/main/cpp/jni_wrapper.cc           |  2 ++
 .../org/apache/arrow/dataset/file/FileFormat.java  |  1 +
 .../arrow/dataset/file/TestFileSystemDataset.java  | 41 ++++++++++++++++++++++
 3 files changed, 44 insertions(+)

diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc 
b/java/dataset/src/main/cpp/jni_wrapper.cc
index e96dfb8aed..d088163903 100644
--- a/java/dataset/src/main/cpp/jni_wrapper.cc
+++ b/java/dataset/src/main/cpp/jni_wrapper.cc
@@ -89,6 +89,8 @@ arrow::Result<std::shared_ptr<arrow::dataset::FileFormat>> 
GetFileFormat(
   switch (file_format_id) {
     case 0:
       return std::make_shared<arrow::dataset::ParquetFileFormat>();
+    case 1:
+      return std::make_shared<arrow::dataset::IpcFileFormat>();
     default:
       std::string error_message =
           "illegal file format id: " + std::to_string(file_format_id);
diff --git 
a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java 
b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
index 107fc2f71d..343e458ce2 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
@@ -22,6 +22,7 @@ package org.apache.arrow.dataset.file;
  */
 public enum FileFormat {
   PARQUET(0),
+  ARROW_IPC(1),
   NONE(-1);
 
   private final int id;
diff --git 
a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
 
b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
index 92610b1145..2fd8a19bac 100644
--- 
a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
+++ 
b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
@@ -23,6 +23,7 @@ import static 
org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -46,11 +47,15 @@ import org.apache.arrow.dataset.jni.TestNativeDataset;
 import org.apache.arrow.dataset.scanner.ScanOptions;
 import org.apache.arrow.util.AutoCloseables;
 import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
 import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
@@ -316,6 +321,42 @@ public class TestFileSystemDataset extends 
TestNativeDataset {
     AutoCloseables.close(factory);
   }
 
+  @Test
+  public void testBaseArrowIpcRead() throws Exception {
+    File dataFile = TMP.newFile();
+    Schema sourceSchema = new 
Schema(Collections.singletonList(Field.nullable("ints", new ArrowType.Int(32, 
true))));
+    try (VectorSchemaRoot root = VectorSchemaRoot.create(sourceSchema, 
rootAllocator());
+         FileOutputStream sink = new FileOutputStream(dataFile);
+         ArrowFileWriter writer = new ArrowFileWriter(root, 
/*dictionaryProvider=*/null, sink.getChannel())) {
+      IntVector ints = (IntVector) root.getVector(0);
+      ints.setSafe(0, 0);
+      ints.setSafe(1, 1024);
+      ints.setSafe(2, Integer.MAX_VALUE);
+      root.setRowCount(3);
+      writer.start();
+      writer.writeBatch();
+      writer.end();
+    }
+
+    String arrowDataURI = dataFile.toURI().toString();
+    FileSystemDatasetFactory factory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.ARROW_IPC, arrowDataURI);
+    ScanOptions options = new ScanOptions(100);
+    Schema schema = inferResultSchemaFromFactory(factory, options);
+    List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
+
+    assertSingleTaskProduced(factory, options);
+    assertEquals(1, datum.size());
+    assertEquals(1, schema.getFields().size());
+    assertEquals("ints", schema.getFields().get(0).getName());
+
+    String expectedJsonUnordered = String.format("[[0],[1024],[%d]]", 
Integer.MAX_VALUE);
+    checkParquetReadResult(schema, expectedJsonUnordered, datum);
+
+    AutoCloseables.close(datum);
+    AutoCloseables.close(factory);
+  }
+
   private void checkParquetReadResult(Schema schema, String expectedJson, 
List<ArrowRecordBatch> actual)
       throws IOException {
     final ObjectMapper json = new ObjectMapper();

Reply via email to