This is an automated email from the ASF dual-hosted git repository.
shiro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new b95628f ARROW-4166: [Ruby] Add support for saving to and loading from
buffer
b95628f is described below
commit b95628f2980fd800efe73ab0e4778dd209f7596c
Author: Kouhei Sutou <[email protected]>
AuthorDate: Mon Jan 7 08:54:59 2019 +0900
ARROW-4166: [Ruby] Add support for saving to and loading from buffer
Author: Kouhei Sutou <[email protected]>
Closes #3320 from kou/ruby-table-io-buffer and squashes the following
commits:
7025e765 <Kouhei Sutou> Add support for saving to and loading from buffer
---
ruby/red-arrow/lib/arrow/table-loader.rb | 46 ++++---
ruby/red-arrow/lib/arrow/table-saver.rb | 66 +++++-----
ruby/red-arrow/test/test-table.rb | 139 ++++++++++++++-------
.../lib/parquet/arrow-table-loadable.rb | 7 +-
.../red-parquet/lib/parquet/arrow-table-savable.rb | 6 +-
ruby/red-parquet/test/test-arrow-table.rb | 8 +-
6 files changed, 177 insertions(+), 95 deletions(-)
diff --git a/ruby/red-arrow/lib/arrow/table-loader.rb
b/ruby/red-arrow/lib/arrow/table-loader.rb
index a6ce9a1..9bfd410 100644
--- a/ruby/red-arrow/lib/arrow/table-loader.rb
+++ b/ruby/red-arrow/lib/arrow/table-loader.rb
@@ -18,14 +18,14 @@
module Arrow
class TableLoader
class << self
- def load(path, options={})
- new(path, options).load
+ def load(output, options={})
+ new(output, options).load
end
end
- def initialize(path, options={})
- path = path.to_path if path.respond_to?(:to_path)
- @path = path
+ def initialize(output, options={})
+ output = output.to_path if output.respond_to?(:to_path)
+ @output = output
@options = options
fill_options
end
@@ -50,7 +50,7 @@ module Arrow
__send__(custom_load_method)
else
# For backward compatibility.
- __send__(custom_load_method, @path)
+ __send__(custom_load_method, @output)
end
end
@@ -60,11 +60,15 @@ module Arrow
return
end
- extension = PathExtension.new(@path)
- info = extension.extract
+ if @output.is_a?(Buffer)
+ info = {}
+ else
+ extension = PathExtension.new(@output)
+ info = extension.extract
+ end
format = info[:format]
@options = @options.dup
- if respond_to?("load_as_#{format}", true)
+ if format and respond_to?("load_as_#{format}", true)
@options[:format] ||= format.to_sym
else
@options[:format] ||= :arrow
@@ -74,6 +78,14 @@ module Arrow
end
end
+ def open_input_stream
+ if @output.is_a?(Buffer)
+ BufferInputStream.new(@output)
+ else
+ MemoryMappedInputStream.new(@output)
+ end
+ end
+
def load_raw(input, reader)
schema = reader.schema
chunked_arrays = []
@@ -100,7 +112,7 @@ module Arrow
RecordBatchStreamReader,
]
reader_class_candidates.each do |reader_class_candidate|
- input = MemoryMappedInputStream.new(@path)
+ input = open_input_stream
begin
reader = reader_class_candidate.new(input)
rescue Arrow::Error
@@ -114,20 +126,20 @@ module Arrow
end
def load_as_batch
- input = MemoryMappedInputStream.new(@path)
+ input = open_input_stream
reader = RecordBatchFileReader.new(input)
load_raw(input, reader)
end
def load_as_stream
- input = MemoryMappedInputStream.new(@path)
+ input = open_input_stream
reader = RecordBatchStreamReader.new(input)
load_raw(input, reader)
end
if Arrow.const_defined?(:ORCFileReader)
def load_as_orc
- input = MemoryMappedInputStream.new(@path)
+ input = open_input_stream
reader = ORCFileReader.new(input)
field_indexes = @options[:field_indexes]
reader.set_field_indexes(field_indexes) if field_indexes
@@ -140,11 +152,15 @@ module Arrow
def load_as_csv
options = @options.dup
options.delete(:format)
- CSVLoader.load(Pathname.new(@path), options)
+ if @output.is_a?(Buffer)
+ CSVLoader.load(@output.data.to_s, options)
+ else
+ CSVLoader.load(Pathname.new(@output), options)
+ end
end
def load_as_feather
- input = MemoryMappedInputStream.new(@path)
+ input = open_input_stream
reader = FeatherFileReader.new(input)
table = reader.read
table.instance_variable_set(:@input, input)
diff --git a/ruby/red-arrow/lib/arrow/table-saver.rb
b/ruby/red-arrow/lib/arrow/table-saver.rb
index 99e6e49..817cc54 100644
--- a/ruby/red-arrow/lib/arrow/table-saver.rb
+++ b/ruby/red-arrow/lib/arrow/table-saver.rb
@@ -18,15 +18,15 @@
module Arrow
class TableSaver
class << self
- def save(table, path, options={})
- new(table, path, options).save
+ def save(table, output, options={})
+ new(table, output, options).save
end
end
- def initialize(table, path, options={})
+ def initialize(table, output, options={})
@table = table
- path = path.to_path if path.respond_to?(:to_path)
- @path = path
+ output = output.to_path if output.respond_to?(:to_path)
+ @output = output
@options = options
fill_options
end
@@ -51,7 +51,7 @@ module Arrow
__send__(custom_save_method)
else
# For backward compatibility.
- __send__(custom_save_method, @path)
+ __send__(custom_save_method, @output)
end
end
@@ -61,11 +61,15 @@ module Arrow
return
end
- extension = PathExtension.new(@path)
- info = extension.extract
+ if @output.is_a?(Buffer)
+ info = {}
+ else
+ extension = PathExtension.new(@output)
+ info = extension.extract
+ end
format = info[:format]
@options = @options.dup
- if respond_to?("save_as_#{format}", true)
+ if format and respond_to?("save_as_#{format}", true)
@options[:format] ||= format.to_sym
else
@options[:format] ||= :arrow
@@ -75,8 +79,30 @@ module Arrow
end
end
+ def open_raw_output_stream(&block)
+ if @output.is_a?(Buffer)
+ BufferOutputStream.open(@output, &block)
+ else
+ FileOutputStream.open(@output, false, &block)
+ end
+ end
+
+ def open_output_stream(&block)
+ compression = @options[:compression]
+ if compression
+ codec = Codec.new(compression)
+ open_raw_output_stream do |raw_output|
+ CompressedOutputStream.open(codec, raw_output) do |output|
+ yield(output)
+ end
+ end
+ else
+ open_raw_output_stream(&block)
+ end
+ end
+
def save_raw(writer_class)
- FileOutputStream.open(@path, false) do |output|
+ open_output_stream do |output|
writer_class.open(output, @table.schema) do |writer|
writer.write_table(@table)
end
@@ -95,24 +121,8 @@ module Arrow
save_raw(RecordBatchStreamWriter)
end
- def open_output
- compression = @options[:compression]
- if compression
- codec = Codec.new(compression)
- FileOutputStream.open(@path, false) do |raw_output|
- CompressedOutputStream.open(codec, raw_output) do |output|
- yield(output)
- end
- end
- else
- ::File.open(@path, "w") do |output|
- yield(output)
- end
- end
- end
-
def save_as_csv
- open_output do |output|
+ open_output_stream do |output|
csv = CSV.new(output)
names = @table.schema.fields.collect(&:name)
csv << names
@@ -125,7 +135,7 @@ module Arrow
end
def save_as_feather
- FileOutputStream.open(@path, false) do |output|
+ open_output_stream do |output|
FeatherFileWriter.open(output) do |writer|
writer.write(@table)
end
diff --git a/ruby/red-arrow/test/test-table.rb
b/ruby/red-arrow/test/test-table.rb
index 1576f77..2876f76 100644
--- a/ruby/red-arrow/test/test-table.rb
+++ b/ruby/red-arrow/test/test-table.rb
@@ -395,83 +395,128 @@ class TableTest < Test::Unit::TestCase
end
sub_test_case("#save and .load") do
- sub_test_case(":format") do
- test("default") do
- file = Tempfile.new(["red-arrow", ".arrow"])
- @table.save(file.path)
- assert_equal(@table, Arrow::Table.load(file.path))
+ module SaveLoadFormatTests
+ def test_default
+ output = create_output(".arrow")
+ @table.save(output)
+ assert_equal(@table, Arrow::Table.load(output))
end
- test(":batch") do
- file = Tempfile.new(["red-arrow", ".arrow"])
- @table.save(file.path, :format => :batch)
- assert_equal(@table, Arrow::Table.load(file.path, :format => :batch))
+ def test_batch
+ output = create_output(".arrow")
+ @table.save(output, format: :batch)
+ assert_equal(@table, Arrow::Table.load(output, format: :batch))
end
- test(":stream") do
- file = Tempfile.new(["red-arrow", ".arrow"])
- @table.save(file.path, :format => :stream)
- assert_equal(@table, Arrow::Table.load(file.path, :format => :stream))
+ def test_stream
+ output = create_output(".arrow")
+ @table.save(output, format: :stream)
+ assert_equal(@table, Arrow::Table.load(output, format: :stream))
end
- test(":csv") do
- file = Tempfile.new(["red-arrow", ".csv"])
- @table.save(file.path, :format => :csv)
+ def test_csv
+ output = create_output(".csv")
+ @table.save(output, format: :csv)
assert_equal(@table,
- Arrow::Table.load(file.path,
- :format => :csv,
- :schema => @table.schema))
+ Arrow::Table.load(output,
+ format: :csv,
+ schema: @table.schema))
end
- test("csv.gz") do
- file = Tempfile.new(["red-arrow", ".csv.gz"])
- @table.save(file.path)
+ def test_csv_gz
+ output = create_output(".csv.gz")
+ @table.save(output,
+ format: :csv,
+ compression: :gzip)
assert_equal(@table,
- Arrow::Table.load(file.path,
- :format => :csv,
- :compression => :gzip,
- :schema => @table.schema))
+ Arrow::Table.load(output,
+ format: :csv,
+ compression: :gzip,
+ schema: @table.schema))
end
+ end
+
+ sub_test_case("path") do
+ sub_test_case(":format") do
+ include SaveLoadFormatTests
- sub_test_case("load: auto detect") do
- test("batch") do
- file = Tempfile.new(["red-arrow", ".arrow"])
- @table.save(file.path, :format => :batch)
- assert_equal(@table, Arrow::Table.load(file.path))
+ def create_output(extension)
+ @file = Tempfile.new(["red-arrow", extension])
+ @file.path
end
- test("stream") do
- file = Tempfile.new(["red-arrow", ".arrow"])
- @table.save(file.path, :format => :stream)
- assert_equal(@table, Arrow::Table.load(file.path))
+ sub_test_case("save: auto detect") do
+ test("csv") do
+ output = create_output(".csv")
+ @table.save(output)
+ assert_equal(@table,
+ Arrow::Table.load(output,
+ format: :csv,
+ schema: @table.schema))
+ end
+
+ test("csv.gz") do
+ output = create_output(".csv.gz")
+ @table.save(output)
+ assert_equal(@table,
+ Arrow::Table.load(output,
+ format: :csv,
+ compression: :gzip,
+ schema: @table.schema))
+ end
end
- test("csv") do
- path = fixture_path("with-header.csv")
- assert_equal(<<-TABLE, Arrow::Table.load(path, skip_lines:
/^#/).to_s)
+ sub_test_case("load: auto detect") do
+ test("batch") do
+ output = create_output(".arrow")
+ @table.save(output, format: :batch)
+ assert_equal(@table, Arrow::Table.load(output))
+ end
+
+ test("stream") do
+ output = create_output(".arrow")
+ @table.save(output, format: :stream)
+ assert_equal(@table, Arrow::Table.load(output))
+ end
+
+ test("csv") do
+ path = fixture_path("with-header.csv")
+ table = Arrow::Table.load(path, skip_lines: /^\#/)
+ assert_equal(<<-TABLE, table.to_s)
name score
0 alice 10
1 bob 29
2 chris -1
- TABLE
- end
+ TABLE
+ end
- test("csv.gz") do
- file = Tempfile.new(["red-arrow", ".csv.gz"])
- Zlib::GzipWriter.wrap(file) do |gz|
- gz.write(<<-CSV)
+ test("csv.gz") do
+ file = Tempfile.new(["red-arrow", ".csv.gz"])
+ Zlib::GzipWriter.wrap(file) do |gz|
+ gz.write(<<-CSV)
name,score
alice,10
bob,29
chris,-1
- CSV
- end
- assert_equal(<<-TABLE, Arrow::Table.load(file.path).to_s)
+ CSV
+ end
+ assert_equal(<<-TABLE, Arrow::Table.load(file.path).to_s)
name score
0 alice 10
1 bob 29
2 chris -1
TABLE
+ end
+ end
+ end
+ end
+
+ sub_test_case("Buffer") do
+ sub_test_case(":format") do
+ include SaveLoadFormatTests
+
+ def create_output(extension)
+ Arrow::ResizableBuffer.new(1024)
end
end
end
diff --git a/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb
b/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb
index 4df527b..e3aa1ce 100644
--- a/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb
+++ b/ruby/red-parquet/lib/parquet/arrow-table-loadable.rb
@@ -19,9 +19,12 @@ module Parquet
module ArrowTableLoadable
private
def load_as_parquet
- reader = Parquet::ArrowFileReader.new(@path)
+ input = open_input_stream
+ reader = Parquet::ArrowFileReader.new(input)
reader.use_threads = (@options[:use_threads] != false)
- reader.read_table
+ table = reader.read_table
+ table.instance_variable_set(:@input, input)
+ table
end
end
end
diff --git a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
b/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
index 5d96d5f..7667381 100644
--- a/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
+++ b/ruby/red-parquet/lib/parquet/arrow-table-savable.rb
@@ -20,8 +20,10 @@ module Parquet
private
def save_as_parquet
chunk_size = @options[:chunk_size] || 1024 # TODO
- Parquet::ArrowFileWriter.open(@table.schema, @path) do |writer|
- writer.write_table(@table, chunk_size)
+ open_output_stream do |output|
+ Parquet::ArrowFileWriter.open(@table.schema, output) do |writer|
+ writer.write_table(@table, chunk_size)
+ end
end
end
end
diff --git a/ruby/red-parquet/test/test-arrow-table.rb
b/ruby/red-parquet/test/test-arrow-table.rb
index 258b417..1a565b6 100644
--- a/ruby/red-parquet/test/test-arrow-table.rb
+++ b/ruby/red-parquet/test/test-arrow-table.rb
@@ -40,9 +40,15 @@ class TestArrowTableReader < Test::Unit::TestCase
@table = Arrow::Table.new(schema, [@count_column, @visible_column])
end
- def test_save_load
+ def test_save_load_path
tempfile = Tempfile.open(["red-parquet", ".parquet"])
@table.save(tempfile.path)
assert_equal(@table, Arrow::Table.load(tempfile.path))
end
+
+ def test_save_load_buffer
+ buffer = Arrow::ResizableBuffer.new(1024)
+ @table.save(buffer, format: :parquet)
+ assert_equal(@table, Arrow::Table.load(buffer, format: :parquet))
+ end
end