This is an automated email from the ASF dual-hosted git repository. shiro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new b95628f ARROW-4166: [Ruby] Add support for saving to and loading from buffer b95628f is described below commit b95628f2980fd800efe73ab0e4778dd209f7596c Author: Kouhei Sutou <k...@clear-code.com> AuthorDate: Mon Jan 7 08:54:59 2019 +0900 ARROW-4166: [Ruby] Add support for saving to and loading from buffer Author: Kouhei Sutou <k...@clear-code.com> Closes #3320 from kou/ruby-table-io-buffer and squashes the following commits: 7025e765 <Kouhei Sutou> Add support for saving to and loading from buffer --- ruby/red-arrow/lib/arrow/table-loader.rb | 46 ++++--- ruby/red-arrow/lib/arrow/table-saver.rb | 66 +++++----- ruby/red-arrow/test/test-table.rb | 139 ++++++++++++++------- .../lib/parquet/arrow-table-loadable.rb | 7 +- .../red-parquet/lib/parquet/arrow-table-savable.rb | 6 +- ruby/red-parquet/test/test-arrow-table.rb | 8 +- 6 files changed, 177 insertions(+), 95 deletions(-) diff --git a/ruby/red-arrow/lib/arrow/table-loader.rb b/ruby/red-arrow/lib/arrow/table-loader.rb index a6ce9a1..9bfd410 100644 --- a/ruby/red-arrow/lib/arrow/table-loader.rb +++ b/ruby/red-arrow/lib/arrow/table-loader.rb @@ -18,14 +18,14 @@ module Arrow class TableLoader class << self - def load(path, options={}) - new(path, options).load + def load(output, options={}) + new(output, options).load end end - def initialize(path, options={}) - path = path.to_path if path.respond_to?(:to_path) - @path = path + def initialize(output, options={}) + output = output.to_path if output.respond_to?(:to_path) + @output = output @options = options fill_options end @@ -50,7 +50,7 @@ module Arrow __send__(custom_load_method) else # For backward compatibility. - __send__(custom_load_method, @path) + __send__(custom_load_method, @output) end end @@ -60,11 +60,15 @@ module Arrow return end - extension = PathExtension.new(@path) - info = extension.extract + if @output.is_a?(Buffer) + info = {} + else + extension = PathExtension.new(@output) + info = extension.extract + end format = info[:format] @options = @options.dup - if respond_to?("load_as_#{format}", true) + if format and respond_to?("load_as_#{format}", true) @options[:format] ||= format.to_sym else @options[:format] ||= :arrow @@ -74,6 +78,14 @@ module Arrow end end + def open_input_stream + if @output.is_a?(Buffer) + BufferInputStream.new(@output) + else + MemoryMappedInputStream.new(@output) + end + end + def load_raw(input, reader) schema = reader.schema chunked_arrays = [] @@ -100,7 +112,7 @@ module Arrow RecordBatchStreamReader, ] reader_class_candidates.each do |reader_class_candidate| - input = MemoryMappedInputStream.new(@path) + input = open_input_stream begin reader = reader_class_candidate.new(input) rescue Arrow::Error @@ -114,20 +126,20 @@ module Arrow end def load_as_batch - input = MemoryMappedInputStream.new(@path) + input = open_input_stream reader = RecordBatchFileReader.new(input) load_raw(input, reader) end def load_as_stream - input = MemoryMappedInputStream.new(@path) + input = open_input_stream reader = RecordBatchStreamReader.new(input) load_raw(input, reader) end if Arrow.const_defined?(:ORCFileReader) def load_as_orc - input = MemoryMappedInputStream.new(@path) + input = open_input_stream reader = ORCFileReader.new(input) field_indexes = @options[:field_indexes] reader.set_field_indexes(field_indexes) if field_indexes @@ -140,11 +152,15 @@ module Arrow def load_as_csv options = @options.dup options.delete(:format) - CSVLoader.load(Pathname.new(@path), options) + if @output.is_a?(Buffer) + CSVLoader.load(@output.data.to_s, options) + else + CSVLoader.load(Pathname.new(@output), options) + end end def load_as_feather - input = MemoryMappedInputStream.new(@path) + input = open_input_stream reader = FeatherFileReader.new(input) table = reader.read table.instance_variable_set(:@input, input) diff --git a/ruby/red-arrow/lib/arrow/table-saver.rb b/ruby/red-arrow/lib/arrow/table-saver.rb index 99e6e49..817cc54 100644 --- a/ruby/red-arrow/lib/arrow/table-saver.rb +++ b/ruby/red-arrow/lib/arrow/table-saver.rb @@ -18,15 +18,15 @@ module Arrow class TableSaver class << self - def save(table, path, options={}) - new(table, path, options).save + def save(table, output, options={}) + new(table, output, options).save end end - def initialize(table, path, options={}) + def initialize(table, output, options={}) @table = table - path = path.to_path if path.respond_to?(:to_path) - @path = path + output = output.to_path if output.respond_to?(:to_path) + @output = output @options = options fill_options end @@ -51,7 +51,7 @@ module Arrow __send__(custom_save_method) else # For backward compatibility. - __send__(custom_save_method, @path) + __send__(custom_save_method, @output) end end @@ -61,11 +61,15 @@ module Arrow return end - extension = PathExtension.new(@path) - info = extension.extract + if @output.is_a?(Buffer) + info = {} + else + extension = PathExtension.new(@output) + info = extension.extract + end format = info[:format] @options = @options.dup - if respond_to?("save_as_#{format}", true) + if format and respond_to?("save_as_#{format}", true) @options[:format] ||= format.to_sym else @options[:format] ||= :arrow @@ -75,8 +79,30 @@ module Arrow end end + def open_raw_output_stream(&block) + if @output.is_a?(Buffer) + BufferOutputStream.open(@output, &block) + else + FileOutputStream.open(@output, false, &block) + end + end + + def open_output_stream(&block) + compression = @options[:compression] + if compression + codec = Codec.new(compression) + open_raw_output_stream do |raw_output| + CompressedOutputStream.open(codec, raw_output) do |output| + yield(output) + end + end + else + open_raw_output_stream(&block) + end + end + def save_raw(writer_class) - FileOutputStream.open(@path, false) do |output| + open_output_stream do |output| writer_class.open(output, @table.schema) do |writer| writer.write_table(@table) end @@ -95,24 +121,8 @@ module Arrow save_raw(RecordBatchStreamWriter) end - def open_output - compression = @options[:compression] - if compression - codec = Codec.new(compression) - FileOutputStream.open(@path, false) do |raw_output| - CompressedOutputStream.open(codec, raw_output) do |output| - yield(output) - end - end - else - ::File.open(@path, "w") do |output| - yield(output) - end - end - end - def save_as_csv - open_output do |output| + open_output_stream do |output| csv = CSV.new(output) names = @table.schema.fields.collect(&:name) csv << names @@ -125,7 +135,7 @@ module Arrow end def save_as_feather - FileOutputStream.open(@path, false) do |output| + open_output_stream do |output| FeatherFileWriter.open(output) do |writer| writer.write(@table) end diff --git a/ruby/red-arrow/test/test-table.rb b/ruby/red-arrow/test/test-table.rb index 1576f77..2876f76 100644 --- a/ruby/red-arrow/test/test-table.rb +++ b/ruby/red-arrow/test/test-table.rb @@ -395,83 +395,128 @@ class TableTest < Test::Unit::TestCase end sub_test_case("#save and .load") do - sub_test_case(":format") do - test("default") do - file = Tempfile.new(["red-arrow", ".arrow"]) - @table.save(file.path) - assert_equal(@table, Arrow::Table.load(file.path)) + module SaveLoadFormatTests + def test_default + output = create_output(".arrow") + @table.save(output) + assert_equal(@table, Arrow::Table.load(output)) end - test(":batch") do - file = Tempfile.new(["red-arrow", ".arrow"]) - @table.save(file.path, :format => :batch) - assert_equal(@table, Arrow::Table.load(file.path, :format => :batch)) + def test_batch + output = create_output(".arrow") + @table.save(output, format: :batch) + assert_equal(@table, Arrow::Table.load(output, format: :batch)) end - test(":stream") do - file = Tempfile.new(["red-arrow", ".arrow"]) - @table.save(file.path, :format => :stream) - assert_equal(@table, Arrow::Table.load(file.path, :format => :stream)) + def test_stream + output = create_output(".arrow") + @table.save(output, format: :stream) + assert_equal(@table, Arrow::Table.load(output, format: :stream)) end - test(":csv") do - file = Tempfile.new(["red-arrow", ".csv"]) - @table.save(file.path, :format => :csv) + def test_csv + output = create_output(".csv") + @table.save(output, format: :csv) assert_equal(@table, - Arrow::Table.load(file.path, - :format => :csv, - :schema => @table.schema)) + Arrow::Table.load(output, + format: :csv, + schema: @table.schema)) end - test("csv.gz") do - file = Tempfile.new(["red-arrow", ".csv.gz"]) - @table.save(file.path) + def test_csv_gz + output = create_output(".csv.gz") + @table.save(output, + format: :csv, + compression: :gzip) assert_equal(@table, - Arrow::Table.load(file.path, - :format => :csv, - :compression => :gzip, - :schema => @table.schema)) + Arrow::Table.load(output, + format: :csv, + compression: :gzip, + schema: @table.schema)) end + end + + sub_test_case("path") do + sub_test_case(":format") do + include SaveLoadFormatTests - sub_test_case("load: auto detect") do - test("batch") do - file = Tempfile.new(["red-arrow", ".arrow"]) - @table.save(file.path, :format => :batch) - assert_equal(@table, Arrow::Table.load(file.path)) + def create_output(extension) + @file = Tempfile.new(["red-arrow", extension]) + @file.path end - test("stream") do - file = Tempfile.new(["red-arrow", ".arrow"]) - @table.save(file.path, :format => :stream) - assert_equal(@table, Arrow::Table.load(file.path)) + sub_test_case("save: auto detect") do + test("csv") do + output = create_output(".csv") + @table.save(output) + assert_equal(@table, + Arrow::Table.load(output, + format: :csv, + schema: @table.schema)) + end + + test("csv.gz") do + output = create_output(".csv.gz") + @table.save(output) + assert_equal(@table, + Arrow::Table.load(output, + format: :csv, + compression: :gzip, + schema: @table.schema)) + end end - test("csv") do - path = fixture_path("with-header.csv") - assert_equal(<<-TABLE, Arrow::Table.load(path, skip_lines: /^#/).to_s) + sub_test_case("load: auto detect") do + test("batch") do + output = create_output(".arrow") + @table.save(output, format: :batch) + assert_equal(@table, Arrow::Table.load(output)) + end + + test("stream") do + output = create_output(".arrow") + @table.save(output, format: :stream) + assert_equal(@table, Arrow::Table.load(output)) + end + + test("csv") do + path = fixture_path("with-header.csv") + table = Arrow::Table.load(path, skip_lines: /^\#/) + assert_equal(<<-TABLE, table.to_s) name score 0 alice 10 1 bob 29 2 chris -1 - TABLE - end + TABLE + end - test("csv.gz") do - file = Tempfile.new(["red-arrow", ".csv.gz"]) - Zlib::GzipWriter.wrap(file) do |gz| - gz.write(<<-CSV) + test("csv.gz") do + file = Tempfile.new(["red-arrow", ".csv.gz"]) + Zlib::GzipWriter.wrap(file) do |gz| + gz.write(<<-CSV) name,score alice,10 bob,29 chris,-1 - CSV - end - assert_equal(<<-TABLE, Arrow::Table.load(file.path).to_s) + CSV + end + assert_equal(<<-TABLE, Arrow::Table.load(file.path).to_s) name score 0 alice 10 1 bob 29 2 chris -1 TABLE + end + end + end + end + + sub_test_case("Buffer") do + sub_test_case(":format") do + include SaveLoadFormatTests + + def create_output(extension) + Arrow::ResizableBuffer.new(1024) end end end diff --git a/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb b/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb index 4df527b..e3aa1ce 100644 --- a/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb +++ b/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb @@ -19,9 +19,12 @@ module Parquet module ArrowTableLoadable private def load_as_parquet - reader = Parquet::ArrowFileReader.new(@path) + input = open_input_stream + reader = Parquet::ArrowFileReader.new(input) reader.use_threads = (@options[:use_threads] != false) - reader.read_table + table = reader.read_table + table.instance_variable_set(:@input, input) + table end end end diff --git a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb b/ruby/red-parquet/lib/parquet/arrow-table-savable.rb index 5d96d5f..7667381 100644 --- a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb +++ b/ruby/red-parquet/lib/parquet/arrow-table-savable.rb @@ -20,8 +20,10 @@ module Parquet private def save_as_parquet chunk_size = @options[:chunk_size] || 1024 # TODO - Parquet::ArrowFileWriter.open(@table.schema, @path) do |writer| - writer.write_table(@table, chunk_size) + open_output_stream do |output| + Parquet::ArrowFileWriter.open(@table.schema, output) do |writer| + writer.write_table(@table, chunk_size) + end end end end diff --git a/ruby/red-parquet/test/test-arrow-table.rb b/ruby/red-parquet/test/test-arrow-table.rb index 258b417..1a565b6 100644 --- a/ruby/red-parquet/test/test-arrow-table.rb +++ b/ruby/red-parquet/test/test-arrow-table.rb @@ -40,9 +40,15 @@ class TestArrowTableReader < Test::Unit::TestCase @table = Arrow::Table.new(schema, [@count_column, @visible_column]) end - def test_save_load + def test_save_load_path tempfile = Tempfile.open(["red-parquet", ".parquet"]) @table.save(tempfile.path) assert_equal(@table, Arrow::Table.load(tempfile.path)) end + + def test_save_load_buffer + buffer = Arrow::ResizableBuffer.new(1024) + @table.save(buffer, format: :parquet) + assert_equal(@table, Arrow::Table.load(buffer, format: :parquet)) + end end