This is an automated email from the ASF dual-hosted git repository.
tjwp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new df51e78 AVRO-3054: Fix / support decimal logical type in Ruby (#1102)
df51e78 is described below
commit df51e78752030c5ac929bd9adbb6fe1739a28485
Author: Andrew Thauer <[email protected]>
AuthorDate: Sat May 15 10:43:20 2021 -0400
AVRO-3054: Fix / support decimal logical type in Ruby (#1102)
Co-authored-by: Johannes Vetter <[email protected]>
Co-authored-by: Keith Gable <[email protected]>
---
lang/ruby/Gemfile | 4 +-
lang/ruby/lib/avro/logical_types.rb | 187 +++++++++++++++++++++++++++++++-
lang/ruby/lib/avro/schema.rb | 22 +++-
lang/ruby/test/test_logical_types.rb | 136 +++++++++++++++++++++++
lang/ruby/test/test_schema.rb | 67 +++++++++++-
lang/ruby/test/test_schema_validator.rb | 14 +++
6 files changed, 420 insertions(+), 10 deletions(-)
diff --git a/lang/ruby/Gemfile b/lang/ruby/Gemfile
index 9756e0f..45ae0af 100644
--- a/lang/ruby/Gemfile
+++ b/lang/ruby/Gemfile
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-source 'https://rubygems.org'
+source 'https://rubygems.org'
gem 'rake'
gem 'echoe'
gem 'multi_json'
@@ -25,6 +25,8 @@ gem 'parallel'
# webrick is no longer included with Ruby 3.0+
gem 'webrick'
+gem 'memory_profiler'
+
# rubocop v1.0 and later introduces new Lint cops to be addressed
gem 'rubocop', '< 1.0'
gem 'rdoc'
diff --git a/lang/ruby/lib/avro/logical_types.rb
b/lang/ruby/lib/avro/logical_types.rb
index 13babfc..1d1fb6f 100644
--- a/lang/ruby/lib/avro/logical_types.rb
+++ b/lang/ruby/lib/avro/logical_types.rb
@@ -17,9 +17,188 @@
# limitations under the License.
require 'date'
+require 'bigdecimal'
+require 'bigdecimal/util'
module Avro
module LogicalTypes
+ ##
+ # Base class for logical types requiring a schema to be present
+ class LogicalTypeWithSchema
+ ##
+ # @return [Avro::Schema] The schema this logical type is dealing with
+ attr_reader :schema
+
+ ##
+ # Build a new instance of a logical type using the provided schema
+ #
+ # @param schema [Avro::Schema]
+ # The schema to use with this instance
+ #
+ # @raise [ArgumentError]
+ # If the provided schema is nil
+ def initialize(schema)
+ raise ArgumentError, 'schema is required' if schema.nil?
+
+ @schema = schema
+ end
+
+ ##
+ # Encode the provided datum
+ #
+ # @param datum [Object] The datum to encode
+ #
+ # @raise [NotImplementedError]
+ # Subclass will need to override this method
+ def encode(datum)
+ raise NotImplementedError
+ end
+
+ ##
+ # Decode the provided datum
+ #
+ # @param datum [Object] The datum to decode
+ #
+ # @raise [NotImplementedError]
+ # Subclass will need to override this method
+ def decode(datum)
+ raise NotImplementedError
+ end
+ end
+
+ ##
+ # Logical type to handle arbitrary-precision decimals using byte array.
+ #
+ # The byte array contains the two's-complement representation of the
unscaled integer
+ # value in big-endian byte order.
+ class BytesDecimal < LogicalTypeWithSchema
+ # Messages for exceptions
+ ERROR_INSUFFICIENT_PRECISION = 'Precision is too small'
+ ERROR_ROUNDING_NECESSARY = 'Rounding necessary'
+ ERROR_VALUE_MUST_BE_NUMERIC = 'value must be numeric'
+
+ # The pattern used to pack up the byte array (8 bit unsigned
integer/char)
+ PACK_UNSIGNED_CHARS = 'C*'
+
+ # The number 10 as BigDecimal
+ TEN = BigDecimal(10).freeze
+
+ ##
+ # @return [Integer] The number of total digits supported by the decimal
+ attr_reader :precision
+
+ ##
+ # @return [Integer] The number of fractional digits
+ attr_reader :scale
+
+ ##
+ # Build a new decimal logical type
+ #
+ # @param schema [Avro::Schema]
+ # The schema defining precision and scale for the conversion
+ def initialize(schema)
+ super
+
+ @scale = schema.scale.to_i
+ @precision = schema.precision.to_i
+ @factor = TEN ** @scale
+ end
+
+ ##
+ # Encode the provided value into a byte array
+ #
+ # @param value [BigDecimal, Float, Integer]
+ # The numeric value to encode
+ #
+ # @raise [ArgumentError]
+ # If the provided value is not a numeric type
+ #
+ # @raise [RangeError]
+ # If the provided value has a scale higher than the schema permits,
+ # or does not fit into the schema's precision
+ def encode(value)
+ raise ArgumentError, ERROR_VALUE_MUST_BE_NUMERIC unless
value.is_a?(Numeric)
+
+
to_byte_array(unscaled_value(value.to_d)).pack(PACK_UNSIGNED_CHARS).freeze
+ end
+
+ ##
+ # Decode a byte array (in form of a string) into a BigDecimal of the
+ # given precision and scale
+ #
+ # @param stream [String]
+ # The byte array to decode
+ #
+ # @return [BigDecimal]
+ def decode(stream)
+ from_byte_array(stream) / @factor
+ end
+
+ private
+
+ ##
+ # Convert the provided stream of bytes into the unscaled value
+ #
+ # @param stream [String]
+ # The stream of bytes to convert
+ #
+ # @return [Integer]
+ def from_byte_array(stream)
+ bytes = stream.bytes
+ positive = bytes.first[7].zero?
+ total = 0
+
+ bytes.each_with_index do |value, ix|
+ total += (positive ? value : (value ^ 0xff)) << (bytes.length - ix -
1) * 8
+ end
+
+ return total if positive
+
+ -(total + 1)
+ end
+
+ ##
+ # Convert the provided number into its two's complement representation
+ # in network order (big endian).
+ #
+ # @param number [Integer]
+ # The number to convert
+ #
+ # @return [Array<Integer>]
+ # The byte array in network order
+ def to_byte_array(number)
+ [].tap do |result|
+ loop do
+ result.unshift(number & 0xff)
+ number >>= 8
+
+ break if (number == 0 || number == -1) && (result.first[7] ==
number[7])
+ end
+ end
+ end
+
+ ##
+ # Get the unscaled value from a BigDecimal considering the schema's scale
+ #
+ # @param decimal [BigDecimal]
+ # The decimal to get the unscaled value from
+ #
+ # @return [Integer]
+ def unscaled_value(decimal)
+ details = decimal.split
+ length = details[1].length
+
+ fractional_part = length - details[3]
+ raise RangeError, ERROR_ROUNDING_NECESSARY if fractional_part > scale
+
+ if length > precision || (length - fractional_part) > (precision -
scale)
+ raise RangeError, ERROR_INSUFFICIENT_PRECISION
+ end
+
+ (decimal * @factor).to_i
+ end
+ end
+
module IntDate
EPOCH_START = Date.new(1970, 1, 1)
@@ -73,6 +252,9 @@ module Avro
end
TYPES = {
+ "bytes" => {
+ "decimal" => BytesDecimal
+ },
"int" => {
"date" => IntDate
},
@@ -82,10 +264,11 @@ module Avro
},
}.freeze
- def self.type_adapter(type, logical_type)
+ def self.type_adapter(type, logical_type, schema = nil)
return unless logical_type
- TYPES.fetch(type, {}.freeze).fetch(logical_type, Identity)
+ adapter = TYPES.fetch(type, {}.freeze).fetch(logical_type, Identity)
+ adapter.is_a?(Class) ? adapter.new(schema) : adapter
end
end
end
diff --git a/lang/ruby/lib/avro/schema.rb b/lang/ruby/lib/avro/schema.rb
index eba379f..a16dd86 100644
--- a/lang/ruby/lib/avro/schema.rb
+++ b/lang/ruby/lib/avro/schema.rb
@@ -136,7 +136,7 @@ module Avro
def type; @type_sym.to_s; end
def type_adapter
- @type_adapter ||= LogicalTypes.type_adapter(type, logical_type) ||
LogicalTypes::Identity
+ @type_adapter ||= LogicalTypes.type_adapter(type, logical_type, self) ||
LogicalTypes::Identity
end
# Returns the MD5 fingerprint of the schema as an Integer.
@@ -484,11 +484,19 @@ module Avro
end
class BytesSchema < PrimitiveSchema
+ ERROR_INVALID_SCALE = 'Scale must be greater than or equal to 0'
+ ERROR_INVALID_PRECISION = 'Precision must be positive'
+ ERROR_PRECISION_TOO_SMALL = 'Precision must be greater than scale'
+
attr_reader :precision, :scale
+
def initialize(type, logical_type=nil, precision=nil, scale=nil)
super(type.to_sym, logical_type)
- @precision = precision
- @scale = scale
+
+ @precision = precision.to_i if precision
+ @scale = scale.to_i if scale
+
+ validate_decimal! if logical_type == DECIMAL_LOGICAL_TYPE
end
def to_avro(names=nil)
@@ -509,6 +517,14 @@ module Avro
false
end
+
+ private
+
+ def validate_decimal!
+ raise Avro::SchemaParseError, ERROR_INVALID_PRECISION unless
precision.to_i.positive?
+ raise Avro::SchemaParseError, ERROR_INVALID_SCALE if
scale.to_i.negative?
+ raise Avro::SchemaParseError, ERROR_PRECISION_TOO_SMALL if precision <
scale.to_i
+ end
end
class FixedSchema < NamedSchema
diff --git a/lang/ruby/test/test_logical_types.rb
b/lang/ruby/test/test_logical_types.rb
index 5b1efda..9806eae 100644
--- a/lang/ruby/test/test_logical_types.rb
+++ b/lang/ruby/test/test_logical_types.rb
@@ -17,6 +17,7 @@
# limitations under the License.
require 'test_help'
+require 'memory_profiler'
class TestLogicalTypes < Test::Unit::TestCase
def test_int_date
@@ -99,6 +100,141 @@ class TestLogicalTypes < Test::Unit::TestCase
assert_equal 'duration', schema.logical_type
end
+ def test_bytes_decimal
+ schema = Avro::Schema.parse <<-SCHEMA
+ { "type": "bytes", "logicalType": "decimal", "precision": 9, "scale": 6 }
+ SCHEMA
+
+ assert_equal 'decimal', schema.logical_type
+ assert_equal 9, schema.precision
+ assert_equal 6, schema.scale
+
+ assert_encode_and_decode BigDecimal('-3.4562'), schema
+ assert_encode_and_decode BigDecimal('3.4562'), schema
+ assert_encode_and_decode 15.123, schema
+ assert_encode_and_decode 15, schema
+ assert_encode_and_decode BigDecimal('0.123456'), schema
+ assert_encode_and_decode BigDecimal('0'), schema
+ assert_encode_and_decode BigDecimal('1'), schema
+ assert_encode_and_decode BigDecimal('-1'), schema
+
+ assert_raise ArgumentError do
+ type = Avro::LogicalTypes::BytesDecimal.new(schema)
+ type.encode('1.23')
+ end
+ end
+
+ def test_bytes_decimal_range_errors
+ schema = Avro::Schema.parse <<-SCHEMA
+ { "type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2 }
+ SCHEMA
+
+ type = Avro::LogicalTypes::BytesDecimal.new(schema)
+
+ assert_raises RangeError do
+ type.encode(BigDecimal('345'))
+ end
+
+ assert_raises RangeError do
+ type.encode(BigDecimal('1.5342'))
+ end
+
+ assert_raises RangeError do
+ type.encode(BigDecimal('-1.5342'))
+ end
+
+ assert_raises RangeError do
+ type.encode(BigDecimal('-100.2'))
+ end
+
+ assert_raises RangeError do
+ type.encode(BigDecimal('-99.991'))
+ end
+ end
+
+ def test_bytes_decimal_conversion
+ schema = Avro::Schema.parse <<-SCHEMA
+ { "type": "bytes", "logicalType": "decimal", "precision": 12, "scale": 6
}
+ SCHEMA
+
+ type = Avro::LogicalTypes::BytesDecimal.new(schema)
+
+ enc = "\xcb\x43\x38".dup.force_encoding('BINARY')
+ assert_equal enc, type.encode(BigDecimal('-3.4562'))
+ assert_equal BigDecimal('-3.4562'), type.decode(enc)
+
+ assert_equal "\x34\xbc\xc8".dup.force_encoding('BINARY'),
type.encode(BigDecimal('3.4562'))
+ assert_equal BigDecimal('3.4562'),
type.decode("\x34\xbc\xc8".dup.force_encoding('BINARY'))
+
+ assert_equal "\x6a\x33\x0e\x87\x00".dup.force_encoding('BINARY'),
type.encode(BigDecimal('456123.123456'))
+ assert_equal BigDecimal('456123.123456'),
type.decode("\x6a\x33\x0e\x87\x00".dup.force_encoding('BINARY'))
+ end
+
+ def test_logical_type_with_schema
+ exception = assert_raises(ArgumentError) do
+ Avro::LogicalTypes::LogicalTypeWithSchema.new(nil)
+ end
+ assert_equal exception.to_s, 'schema is required'
+
+ schema = Avro::Schema.parse <<-SCHEMA
+ { "type": "bytes", "logicalType": "decimal", "precision": 12, "scale": 6
}
+ SCHEMA
+
+ assert_nothing_raised do
+ Avro::LogicalTypes::LogicalTypeWithSchema.new(schema)
+ end
+
+ assert_raises NotImplementedError do
+
Avro::LogicalTypes::LogicalTypeWithSchema.new(schema).encode(BigDecimal('2'))
+ end
+
+ assert_raises NotImplementedError do
+ Avro::LogicalTypes::LogicalTypeWithSchema.new(schema).decode('foo')
+ end
+ end
+
+ def test_bytes_decimal_object_allocations_encode
+ schema = Avro::Schema.parse <<-SCHEMA
+ { "type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2 }
+ SCHEMA
+
+ type = Avro::LogicalTypes::BytesDecimal.new(schema)
+
+ positive_value = BigDecimal('5.2')
+ negative_value = BigDecimal('-5.2')
+
+ [positive_value, negative_value].each do |value|
+ report = MemoryProfiler.report do
+ type.encode(value)
+ end
+
+ assert_equal 5, report.total_allocated
+ # Ruby 2.7 does not retain anything. Ruby 2.6 retains 1
+ assert_operator 1, :>=, report.total_retained
+ end
+ end
+
+ def test_bytes_decimal_object_allocations_decode
+ schema = Avro::Schema.parse <<-SCHEMA
+ { "type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2 }
+ SCHEMA
+
+ type = Avro::LogicalTypes::BytesDecimal.new(schema)
+
+ positive_enc = "\x02\b".dup.force_encoding('BINARY')
+ negative_enc = "\xFD\xF8".dup.force_encoding('BINARY')
+
+ [positive_enc, negative_enc].each do |encoded|
+ report = MemoryProfiler.report do
+ type.decode(encoded)
+ end
+
+ assert_equal 5, report.total_allocated
+ # Ruby 2.7 does not retain anything. Ruby 2.6 retains 1
+ assert_operator 1, :>=, report.total_retained
+ end
+ end
+
def encode(datum, schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
diff --git a/lang/ruby/test/test_schema.rb b/lang/ruby/test/test_schema.rb
index 3b66f28..8026530 100644
--- a/lang/ruby/test/test_schema.rb
+++ b/lang/ruby/test/test_schema.rb
@@ -633,23 +633,82 @@ class TestSchema < Test::Unit::TestCase
assert_equal schema_hash, schema.to_avro
end
- def test_bytes_decimal_to_without_precision_scale
+ def test_bytes_decimal_with_string_precision_no_scale
schema = Avro::Schema.parse <<-SCHEMA
{
"type": "bytes",
- "logicalType": "decimal"
+ "logicalType": "decimal",
+ "precision": "7"
}
SCHEMA
schema_hash =
{
'type' => 'bytes',
- 'logicalType' => 'decimal'
+ 'logicalType' => 'decimal',
+ 'precision' => 7
}
assert_equal schema_hash, schema.to_avro
end
+ def test_bytes_decimal_without_precision_or_scale
+ error = assert_raise Avro::SchemaParseError do
+ Avro::Schema.parse <<-SCHEMA
+ {
+ "type": "bytes",
+ "logicalType": "decimal"
+ }
+ SCHEMA
+ end
+
+ assert_equal 'Precision must be positive', error.message
+ end
+
+ def test_bytes_decimal_to_negative_precision
+ error = assert_raise Avro::SchemaParseError do
+ Avro::Schema.parse <<-SCHEMA
+ {
+ "type": "bytes",
+ "logicalType": "decimal",
+ "precision": -1
+ }
+ SCHEMA
+ end
+
+ assert_equal 'Precision must be positive', error.message
+ end
+
+ def test_bytes_decimal_to_negative_scale
+ error = assert_raise Avro::SchemaParseError do
+ Avro::Schema.parse <<-SCHEMA
+ {
+ "type": "bytes",
+ "logicalType": "decimal",
+ "precision": 2,
+ "scale": -1
+ }
+ SCHEMA
+ end
+
+ assert_equal 'Scale must be greater than or equal to 0', error.message
+ end
+
+ def test_bytes_decimal_with_precision_less_than_scale
+ error = assert_raise Avro::SchemaParseError do
+ Avro::Schema.parse <<-SCHEMA
+ {
+ "type": "bytes",
+ "logicalType": "decimal",
+ "precision": 3,
+ "scale": 4
+ }
+ SCHEMA
+ end
+
+ assert_equal 'Precision must be greater than scale', error.message
+ end
+
def test_bytes_schema
schema = Avro::Schema.parse <<-SCHEMA
{
@@ -715,7 +774,7 @@ class TestSchema < Test::Unit::TestCase
ensure
Avro.disable_enum_symbol_validation = nil
end
-
+
def test_validate_field_aliases
exception = assert_raise(Avro::SchemaParseError) do
hash_to_schema(
diff --git a/lang/ruby/test/test_schema_validator.rb
b/lang/ruby/test/test_schema_validator.rb
index e7743bd..8d100ef 100644
--- a/lang/ruby/test/test_schema_validator.rb
+++ b/lang/ruby/test/test_schema_validator.rb
@@ -558,4 +558,18 @@ class TestSchemaValidator < Test::Unit::TestCase
assert_equal(1, exception.result.errors.size)
assert_equal("at . extra field 'color' - not in schema", exception.to_s)
end
+
+ def test_validate_bytes_decimal
+ schema = hash_to_schema(type: 'bytes', logicalType: 'decimal', precision:
4, scale: 2)
+ assert_valid_schema(schema, [BigDecimal('1.23'), 4.2, 1], ['4.2',
BigDecimal('233.2')], true)
+
+ schema = hash_to_schema(type: 'bytes', logicalType: 'decimal', precision:
4, scale: 4)
+ assert_valid_schema(schema, [BigDecimal('0.2345'), 0.2, 0.1], ['4.2',
BigDecimal('233.2')], true)
+
+ schema = hash_to_schema(type: 'bytes', logicalType: 'decimal', precision:
4, scale: 0)
+ assert_valid_schema(schema, [BigDecimal('123'), 2], ['4.2',
BigDecimal('233.2')], true)
+
+ schema = hash_to_schema(type: 'bytes', logicalType: 'decimal', precision:
4)
+ assert_valid_schema(schema, [BigDecimal('123'), 2], ['4.2',
BigDecimal('233.2')], true)
+ end
end