This is an automated email from the ASF dual-hosted git repository.

shiro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new b95628f  ARROW-4166: [Ruby] Add support for saving to and loading from 
buffer
b95628f is described below

commit b95628f2980fd800efe73ab0e4778dd209f7596c
Author: Kouhei Sutou <k...@clear-code.com>
AuthorDate: Mon Jan 7 08:54:59 2019 +0900

    ARROW-4166: [Ruby] Add support for saving to and loading from buffer
    
    Author: Kouhei Sutou <k...@clear-code.com>
    
    Closes #3320 from kou/ruby-table-io-buffer and squashes the following 
commits:
    
    7025e765 <Kouhei Sutou>  Add support for saving to and loading from buffer
---
 ruby/red-arrow/lib/arrow/table-loader.rb           |  46 ++++---
 ruby/red-arrow/lib/arrow/table-saver.rb            |  66 +++++-----
 ruby/red-arrow/test/test-table.rb                  | 139 ++++++++++++++-------
 .../lib/parquet/arrow-table-loadable.rb            |   7 +-
 .../red-parquet/lib/parquet/arrow-table-savable.rb |   6 +-
 ruby/red-parquet/test/test-arrow-table.rb          |   8 +-
 6 files changed, 177 insertions(+), 95 deletions(-)

diff --git a/ruby/red-arrow/lib/arrow/table-loader.rb 
b/ruby/red-arrow/lib/arrow/table-loader.rb
index a6ce9a1..9bfd410 100644
--- a/ruby/red-arrow/lib/arrow/table-loader.rb
+++ b/ruby/red-arrow/lib/arrow/table-loader.rb
@@ -18,14 +18,14 @@
 module Arrow
   class TableLoader
     class << self
-      def load(path, options={})
-        new(path, options).load
+      def load(output, options={})
+        new(output, options).load
       end
     end
 
-    def initialize(path, options={})
-      path = path.to_path if path.respond_to?(:to_path)
-      @path = path
+    def initialize(output, options={})
+      output = output.to_path if output.respond_to?(:to_path)
+      @output = output
       @options = options
       fill_options
     end
@@ -50,7 +50,7 @@ module Arrow
         __send__(custom_load_method)
       else
         # For backward compatibility.
-        __send__(custom_load_method, @path)
+        __send__(custom_load_method, @output)
       end
     end
 
@@ -60,11 +60,15 @@ module Arrow
         return
       end
 
-      extension = PathExtension.new(@path)
-      info = extension.extract
+      if @output.is_a?(Buffer)
+        info = {}
+      else
+        extension = PathExtension.new(@output)
+        info = extension.extract
+      end
       format = info[:format]
       @options = @options.dup
-      if respond_to?("load_as_#{format}", true)
+      if format and respond_to?("load_as_#{format}", true)
         @options[:format] ||= format.to_sym
       else
         @options[:format] ||= :arrow
@@ -74,6 +78,14 @@ module Arrow
       end
     end
 
+    def open_input_stream
+      if @output.is_a?(Buffer)
+        BufferInputStream.new(@output)
+      else
+        MemoryMappedInputStream.new(@output)
+      end
+    end
+
     def load_raw(input, reader)
       schema = reader.schema
       chunked_arrays = []
@@ -100,7 +112,7 @@ module Arrow
         RecordBatchStreamReader,
       ]
       reader_class_candidates.each do |reader_class_candidate|
-        input = MemoryMappedInputStream.new(@path)
+        input = open_input_stream
         begin
           reader = reader_class_candidate.new(input)
         rescue Arrow::Error
@@ -114,20 +126,20 @@ module Arrow
     end
 
     def load_as_batch
-      input = MemoryMappedInputStream.new(@path)
+      input = open_input_stream
       reader = RecordBatchFileReader.new(input)
       load_raw(input, reader)
     end
 
     def load_as_stream
-      input = MemoryMappedInputStream.new(@path)
+      input = open_input_stream
       reader = RecordBatchStreamReader.new(input)
       load_raw(input, reader)
     end
 
     if Arrow.const_defined?(:ORCFileReader)
       def load_as_orc
-        input = MemoryMappedInputStream.new(@path)
+        input = open_input_stream
         reader = ORCFileReader.new(input)
         field_indexes = @options[:field_indexes]
         reader.set_field_indexes(field_indexes) if field_indexes
@@ -140,11 +152,15 @@ module Arrow
     def load_as_csv
       options = @options.dup
       options.delete(:format)
-      CSVLoader.load(Pathname.new(@path), options)
+      if @output.is_a?(Buffer)
+        CSVLoader.load(@output.data.to_s, options)
+      else
+        CSVLoader.load(Pathname.new(@output), options)
+      end
     end
 
     def load_as_feather
-      input = MemoryMappedInputStream.new(@path)
+      input = open_input_stream
       reader = FeatherFileReader.new(input)
       table = reader.read
       table.instance_variable_set(:@input, input)
diff --git a/ruby/red-arrow/lib/arrow/table-saver.rb 
b/ruby/red-arrow/lib/arrow/table-saver.rb
index 99e6e49..817cc54 100644
--- a/ruby/red-arrow/lib/arrow/table-saver.rb
+++ b/ruby/red-arrow/lib/arrow/table-saver.rb
@@ -18,15 +18,15 @@
 module Arrow
   class TableSaver
     class << self
-      def save(table, path, options={})
-        new(table, path, options).save
+      def save(table, output, options={})
+        new(table, output, options).save
       end
     end
 
-    def initialize(table, path, options={})
+    def initialize(table, output, options={})
       @table = table
-      path = path.to_path if path.respond_to?(:to_path)
-      @path = path
+      output = output.to_path if output.respond_to?(:to_path)
+      @output = output
       @options = options
       fill_options
     end
@@ -51,7 +51,7 @@ module Arrow
         __send__(custom_save_method)
       else
         # For backward compatibility.
-        __send__(custom_save_method, @path)
+        __send__(custom_save_method, @output)
       end
     end
 
@@ -61,11 +61,15 @@ module Arrow
         return
       end
 
-      extension = PathExtension.new(@path)
-      info = extension.extract
+      if @output.is_a?(Buffer)
+        info = {}
+      else
+        extension = PathExtension.new(@output)
+        info = extension.extract
+      end
       format = info[:format]
       @options = @options.dup
-      if respond_to?("save_as_#{format}", true)
+      if format and respond_to?("save_as_#{format}", true)
         @options[:format] ||= format.to_sym
       else
         @options[:format] ||= :arrow
@@ -75,8 +79,30 @@ module Arrow
       end
     end
 
+    def open_raw_output_stream(&block)
+      if @output.is_a?(Buffer)
+        BufferOutputStream.open(@output, &block)
+      else
+        FileOutputStream.open(@output, false, &block)
+      end
+    end
+
+    def open_output_stream(&block)
+      compression = @options[:compression]
+      if compression
+        codec = Codec.new(compression)
+        open_raw_output_stream do |raw_output|
+          CompressedOutputStream.open(codec, raw_output) do |output|
+            yield(output)
+          end
+        end
+      else
+        open_raw_output_stream(&block)
+      end
+    end
+
     def save_raw(writer_class)
-      FileOutputStream.open(@path, false) do |output|
+      open_output_stream do |output|
         writer_class.open(output, @table.schema) do |writer|
           writer.write_table(@table)
         end
@@ -95,24 +121,8 @@ module Arrow
       save_raw(RecordBatchStreamWriter)
     end
 
-    def open_output
-      compression = @options[:compression]
-      if compression
-        codec = Codec.new(compression)
-        FileOutputStream.open(@path, false) do |raw_output|
-          CompressedOutputStream.open(codec, raw_output) do |output|
-            yield(output)
-          end
-        end
-      else
-        ::File.open(@path, "w") do |output|
-          yield(output)
-        end
-      end
-    end
-
     def save_as_csv
-      open_output do |output|
+      open_output_stream do |output|
         csv = CSV.new(output)
         names = @table.schema.fields.collect(&:name)
         csv << names
@@ -125,7 +135,7 @@ module Arrow
     end
 
     def save_as_feather
-      FileOutputStream.open(@path, false) do |output|
+      open_output_stream do |output|
         FeatherFileWriter.open(output) do |writer|
           writer.write(@table)
         end
diff --git a/ruby/red-arrow/test/test-table.rb 
b/ruby/red-arrow/test/test-table.rb
index 1576f77..2876f76 100644
--- a/ruby/red-arrow/test/test-table.rb
+++ b/ruby/red-arrow/test/test-table.rb
@@ -395,83 +395,128 @@ class TableTest < Test::Unit::TestCase
   end
 
   sub_test_case("#save and .load") do
-    sub_test_case(":format") do
-      test("default") do
-        file = Tempfile.new(["red-arrow", ".arrow"])
-        @table.save(file.path)
-        assert_equal(@table, Arrow::Table.load(file.path))
+    module SaveLoadFormatTests
+      def test_default
+        output = create_output(".arrow")
+        @table.save(output)
+        assert_equal(@table, Arrow::Table.load(output))
       end
 
-      test(":batch") do
-        file = Tempfile.new(["red-arrow", ".arrow"])
-        @table.save(file.path, :format => :batch)
-        assert_equal(@table, Arrow::Table.load(file.path, :format => :batch))
+      def test_batch
+        output = create_output(".arrow")
+        @table.save(output, format: :batch)
+        assert_equal(@table, Arrow::Table.load(output, format: :batch))
       end
 
-      test(":stream") do
-        file = Tempfile.new(["red-arrow", ".arrow"])
-        @table.save(file.path, :format => :stream)
-        assert_equal(@table, Arrow::Table.load(file.path, :format => :stream))
+      def test_stream
+        output = create_output(".arrow")
+        @table.save(output, format: :stream)
+        assert_equal(@table, Arrow::Table.load(output, format: :stream))
       end
 
-      test(":csv") do
-        file = Tempfile.new(["red-arrow", ".csv"])
-        @table.save(file.path, :format => :csv)
+      def test_csv
+        output = create_output(".csv")
+        @table.save(output, format: :csv)
         assert_equal(@table,
-                     Arrow::Table.load(file.path,
-                                       :format => :csv,
-                                       :schema => @table.schema))
+                     Arrow::Table.load(output,
+                                       format: :csv,
+                                       schema: @table.schema))
       end
 
-      test("csv.gz") do
-        file = Tempfile.new(["red-arrow", ".csv.gz"])
-        @table.save(file.path)
+      def test_csv_gz
+        output = create_output(".csv.gz")
+        @table.save(output,
+                    format: :csv,
+                    compression: :gzip)
         assert_equal(@table,
-                     Arrow::Table.load(file.path,
-                                       :format => :csv,
-                                       :compression => :gzip,
-                                       :schema => @table.schema))
+                     Arrow::Table.load(output,
+                                       format: :csv,
+                                       compression: :gzip,
+                                       schema: @table.schema))
       end
+    end
+
+    sub_test_case("path") do
+      sub_test_case(":format") do
+        include SaveLoadFormatTests
 
-      sub_test_case("load: auto detect") do
-        test("batch") do
-          file = Tempfile.new(["red-arrow", ".arrow"])
-          @table.save(file.path, :format => :batch)
-          assert_equal(@table, Arrow::Table.load(file.path))
+        def create_output(extension)
+          @file = Tempfile.new(["red-arrow", extension])
+          @file.path
         end
 
-        test("stream") do
-          file = Tempfile.new(["red-arrow", ".arrow"])
-          @table.save(file.path, :format => :stream)
-          assert_equal(@table, Arrow::Table.load(file.path))
+        sub_test_case("save: auto detect") do
+          test("csv") do
+            output = create_output(".csv")
+            @table.save(output)
+            assert_equal(@table,
+                         Arrow::Table.load(output,
+                                           format: :csv,
+                                           schema: @table.schema))
+          end
+
+          test("csv.gz") do
+            output = create_output(".csv.gz")
+            @table.save(output)
+            assert_equal(@table,
+                         Arrow::Table.load(output,
+                                           format: :csv,
+                                           compression: :gzip,
+                                           schema: @table.schema))
+          end
         end
 
-        test("csv") do
-          path = fixture_path("with-header.csv")
-          assert_equal(<<-TABLE, Arrow::Table.load(path, skip_lines: 
/^#/).to_s)
+        sub_test_case("load: auto detect") do
+          test("batch") do
+            output = create_output(".arrow")
+            @table.save(output, format: :batch)
+            assert_equal(@table, Arrow::Table.load(output))
+          end
+
+          test("stream") do
+            output = create_output(".arrow")
+            @table.save(output, format: :stream)
+            assert_equal(@table, Arrow::Table.load(output))
+          end
+
+          test("csv") do
+            path = fixture_path("with-header.csv")
+            table = Arrow::Table.load(path, skip_lines: /^\#/)
+            assert_equal(<<-TABLE, table.to_s)
        name    score
 0      alice      10
 1      bob        29
 2      chris      -1
-          TABLE
-        end
+            TABLE
+          end
 
-        test("csv.gz") do
-          file = Tempfile.new(["red-arrow", ".csv.gz"])
-          Zlib::GzipWriter.wrap(file) do |gz|
-            gz.write(<<-CSV)
+          test("csv.gz") do
+            file = Tempfile.new(["red-arrow", ".csv.gz"])
+            Zlib::GzipWriter.wrap(file) do |gz|
+              gz.write(<<-CSV)
 name,score
 alice,10
 bob,29
 chris,-1
-            CSV
-          end
-          assert_equal(<<-TABLE, Arrow::Table.load(file.path).to_s)
+              CSV
+            end
+            assert_equal(<<-TABLE, Arrow::Table.load(file.path).to_s)
        name    score
 0      alice      10
 1      bob        29
 2      chris      -1
           TABLE
+          end
+        end
+      end
+    end
+
+    sub_test_case("Buffer") do
+      sub_test_case(":format") do
+        include SaveLoadFormatTests
+
+        def create_output(extension)
+          Arrow::ResizableBuffer.new(1024)
         end
       end
     end
diff --git a/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb 
b/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb
index 4df527b..e3aa1ce 100644
--- a/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb
+++ b/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb
@@ -19,9 +19,12 @@ module Parquet
   module ArrowTableLoadable
     private
     def load_as_parquet
-      reader = Parquet::ArrowFileReader.new(@path)
+      input = open_input_stream
+      reader = Parquet::ArrowFileReader.new(input)
       reader.use_threads = (@options[:use_threads] != false)
-      reader.read_table
+      table = reader.read_table
+      table.instance_variable_set(:@input, input)
+      table
     end
   end
 end
diff --git a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb 
b/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
index 5d96d5f..7667381 100644
--- a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
+++ b/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
@@ -20,8 +20,10 @@ module Parquet
     private
     def save_as_parquet
       chunk_size = @options[:chunk_size] || 1024 # TODO
-      Parquet::ArrowFileWriter.open(@table.schema, @path) do |writer|
-        writer.write_table(@table, chunk_size)
+      open_output_stream do |output|
+        Parquet::ArrowFileWriter.open(@table.schema, output) do |writer|
+          writer.write_table(@table, chunk_size)
+        end
       end
     end
   end
diff --git a/ruby/red-parquet/test/test-arrow-table.rb 
b/ruby/red-parquet/test/test-arrow-table.rb
index 258b417..1a565b6 100644
--- a/ruby/red-parquet/test/test-arrow-table.rb
+++ b/ruby/red-parquet/test/test-arrow-table.rb
@@ -40,9 +40,15 @@ class TestArrowTableReader < Test::Unit::TestCase
     @table = Arrow::Table.new(schema, [@count_column, @visible_column])
   end
 
-  def test_save_load
+  def test_save_load_path
     tempfile = Tempfile.open(["red-parquet", ".parquet"])
     @table.save(tempfile.path)
     assert_equal(@table, Arrow::Table.load(tempfile.path))
   end
+
+  def test_save_load_buffer
+    buffer = Arrow::ResizableBuffer.new(1024)
+    @table.save(buffer, format: :parquet)
+    assert_equal(@table, Arrow::Table.load(buffer, format: :parquet))
+  end
 end

Reply via email to