This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new e8d54ea765 ARROW-17789: [Java][Docs] Update Java Dataset documentation 
with latest changes (#14382)
e8d54ea765 is described below

commit e8d54ea765ae7ba63b8f42c29ec855d656e85dc8
Author: david dali susanibar arce <[email protected]>
AuthorDate: Thu Oct 13 07:08:24 2022 -0500

    ARROW-17789: [Java][Docs] Update Java Dataset documentation with latest 
changes (#14382)
    
    Authored-by: david dali susanibar arce <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 docs/source/java/dataset.rst | 121 ++++++++++++++++++++++++++++++++-----------
 1 file changed, 91 insertions(+), 30 deletions(-)

diff --git a/docs/source/java/dataset.rst b/docs/source/java/dataset.rst
index acc14e11b5..6315932a57 100644
--- a/docs/source/java/dataset.rst
+++ b/docs/source/java/dataset.rst
@@ -32,31 +32,50 @@ is not designed only for querying files but can be extended 
to serve all
 possible data sources such as from inter-process communication or from other
 network locations, etc.
 
+.. contents::
+
 Getting Started
 ===============
 
+Currently supported file formats are:
+
+- Apache Arrow (``.arrow``)
+- Apache ORC (``.orc``)
+- Apache Parquet (``.parquet``)
+- Comma-Separated Values (``.csv``)
+
 Below shows a simplest example of using Dataset to query a Parquet file in 
Java:
 
 .. code-block:: Java
 
     // read data from file /opt/example.parquet
     String uri = "file:/opt/example.parquet";
-    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
-    DatasetFactory factory = new FileSystemDatasetFactory(allocator,
-        NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
-    Dataset dataset = factory.finish();
-    Scanner scanner = dataset.newScan(new ScanOptions(100)));
-    List<ArrowRecordBatch> batches = StreamSupport.stream(
-        scanner.scan().spliterator(), false)
-            .flatMap(t -> stream(t.execute()))
-            .collect(Collectors.toList());
-
-    // do something with read record batches, for example:
-    analyzeArrowData(batches);
-
-    // finished the analysis of the data, close all resources:
-    AutoCloseables.close(batches);
-    AutoCloseables.close(factory, dataset, scanner);
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        BufferAllocator allocator = new RootAllocator();
+        DatasetFactory datasetFactory = new FileSystemDatasetFactory(
+                allocator, NativeMemoryPool.getDefault(),
+                FileFormat.PARQUET, uri);
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+        List<ArrowRecordBatch> batches = new ArrayList<>();
+        while (reader.loadNextBatch()) {
+            try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+                final VectorUnloader unloader = new VectorUnloader(root);
+                batches.add(unloader.getRecordBatch());
+            }
+        }
+
+        // do something with read record batches, for example:
+        analyzeArrowData(batches);
+
+        // finished the analysis of the data, close all resources:
+        AutoCloseables.close(batches);
+    } catch (Exception e) {
+        e.printStackTrace();
+    }
 
 .. note::
     ``ArrowRecordBatch`` is a low-level composite Arrow data exchange format
@@ -65,6 +84,9 @@ Below shows a simplest example of using Dataset to query a 
Parquet file in Java:
     aware container ``VectorSchemaRoot`` by which user could be able to access
     decoded data conveniently in Java.
 
+    The ``ScanOptions batchSize`` argument takes effect only if it is set to a 
value
+    smaller than the number of rows in the recordbatch.
+
 .. seealso::
    Load record batches with :doc:`VectorSchemaRoot <vector_schema_root>`.
 
@@ -104,7 +126,7 @@ within method ``Scanner::schema()``:
 .. code-block:: Java
 
     Scanner scanner = dataset.newScan(
-        new ScanOptions(100, Optional.of(new String[] {"id", "name"})));
+        new ScanOptions(32768, Optional.of(new String[] {"id", "name"})));
     Schema projectedSchema = scanner.schema();
 
 .. _java-dataset-projection:
@@ -119,20 +141,20 @@ in the projection list will be accepted. For example:
 .. code-block:: Java
 
     String[] projection = new String[] {"id", "name"};
-    ScanOptions options = new ScanOptions(100, Optional.of(projection));
+    ScanOptions options = new ScanOptions(32768, Optional.of(projection));
 
 If no projection is needed, leave the optional projection argument absent in
 ScanOptions:
 
 .. code-block:: Java
 
-    ScanOptions options = new ScanOptions(100, Optional.empty());
+    ScanOptions options = new ScanOptions(32768, Optional.empty());
 
 Or use shortcut construtor:
 
 .. code-block:: Java
 
-    ScanOptions options = new ScanOptions(100);
+    ScanOptions options = new ScanOptions(32768);
 
 Then all columns will be emitted during scanning.
 
@@ -210,21 +232,60 @@ be thrown during scanning.
     dataset instances. Once the Java buffers are created the passed allocator
     will become their parent allocator.
 
+Usage Notes
+===========
+
 Native Object Resource Management
-=================================
+---------------------------------
+
 As another result of relying on JNI, all components related to
-``FileSystemDataset`` should be closed manually to release the corresponding
-native objects after using. For example:
+``FileSystemDataset`` should be closed manually or use try-with-resources to
+release the corresponding native objects after using. For example:
 
 .. code-block:: Java
 
-    DatasetFactory factory = new FileSystemDatasetFactory(allocator,
-        NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
-    Dataset dataset = factory.finish();
-    Scanner scanner = dataset.newScan(new ScanOptions(100));
+    String uri = "file:/opt/example.parquet";
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        BufferAllocator allocator = new RootAllocator();
+        DatasetFactory factory = new FileSystemDatasetFactory(
+                allocator, NativeMemoryPool.getDefault(),
+                FileFormat.PARQUET, uri);
+        Dataset dataset = factory.finish();
+        Scanner scanner = dataset.newScan(options)
+    ) {
+
+        // do something
+
+    } catch (Exception e) {
+        e.printStackTrace();
+    }
 
-    // do something
+If user forgets to close them then native object leakage might be caused.
 
-    AutoCloseables.close(factory, dataset, scanner);
+BatchSize
+---------
 
-If user forgets to close them then native object leakage might be caused.
+The ``batchSize`` argument of ``ScanOptions`` is a limit on the size of an 
individual batch.
+
+For example, let's try to read a Parquet file with gzip compression and 3 row 
groups:
+
+.. code-block::
+
+   # Let configure ScanOptions as:
+   ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+
+   $ parquet-tools meta data4_3rg_gzip.parquet
+   file schema: schema
+   age:         OPTIONAL INT64 R:0 D:1
+   name:        OPTIONAL BINARY L:STRING R:0 D:1
+   row group 1: RC:4 TS:182 OFFSET:4
+   row group 2: RC:4 TS:190 OFFSET:420
+   row group 3: RC:3 TS:179 OFFSET:838
+
+Here, we set the batchSize in ScanOptions to 32768. Because that's greater
+than the number of rows in the next batch, which is 4 rows because the first
+row group has only 4 rows, then the program gets only 4 rows. The scanner
+will not combine smaller batches to reach the limit, but it will split
+large batches to stay under the limit. So in the case the row group had more
+than 32768 rows, it would get split into blocks of 32768 rows or less.

Reply via email to