[
https://issues.apache.org/jira/browse/AVRO-1695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16678828#comment-16678828
]
ASF GitHub Bot commented on AVRO-1695:
--------------------------------------
Fokko closed pull request #116: AVRO-1695: Ruby support for logical types
revisited
URL: https://github.com/apache/avro/pull/116
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/lang/ruby/Manifest b/lang/ruby/Manifest
index 87bfd98aa..9fc48c2a3 100644
--- a/lang/ruby/Manifest
+++ b/lang/ruby/Manifest
@@ -9,6 +9,7 @@ lib/avro.rb
lib/avro/data_file.rb
lib/avro/io.rb
lib/avro/ipc.rb
+lib/avro/logical_types.rb
lib/avro/protocol.rb
lib/avro/schema.rb
lib/avro/schema_compatibility.rb
@@ -24,6 +25,7 @@ test/test_datafile.rb
test/test_fingerprints.rb
test/test_help.rb
test/test_io.rb
+test/test_logical_types.rb
test/test_protocol.rb
test/test_schema.rb
test/test_schema_compatibility.rb
diff --git a/lang/ruby/lib/avro/io.rb b/lang/ruby/lib/avro/io.rb
index b04a19a78..26bda973a 100644
--- a/lang/ruby/lib/avro/io.rb
+++ b/lang/ruby/lib/avro/io.rb
@@ -254,7 +254,7 @@ def read_data(writers_schema, readers_schema, decoder)
# function dispatch for reading data based on type of writer's
# schema
- case writers_schema.type_sym
+ datum = case writers_schema.type_sym
when :null; decoder.read_null
when :boolean; decoder.read_boolean
when :string; decoder.read_string
@@ -272,6 +272,8 @@ def read_data(writers_schema, readers_schema, decoder)
else
raise AvroError, "Cannot read unknown schema type:
#{writers_schema.type}"
end
+
+ readers_schema.type_adapter.decode(datum)
end
def read_fixed(writers_schema, readers_schema, decoder)
@@ -499,8 +501,10 @@ def write(datum, encoder)
write_data(writers_schema, datum, encoder)
end
- def write_data(writers_schema, datum, encoder)
- unless Schema.validate(writers_schema, datum)
+ def write_data(writers_schema, logical_datum, encoder)
+ datum = writers_schema.type_adapter.encode(logical_datum)
+
+ unless Schema.validate(writers_schema, datum, encoded = true)
raise AvroTypeError.new(writers_schema, datum)
end
diff --git a/lang/ruby/lib/avro/logical_types.rb
b/lang/ruby/lib/avro/logical_types.rb
new file mode 100644
index 000000000..e1b219d72
--- /dev/null
+++ b/lang/ruby/lib/avro/logical_types.rb
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'date'
+
+module Avro
+ module LogicalTypes
+ module IntDate
+ EPOCH_START = Date.new(1970, 1, 1)
+
+ def self.encode(date)
+ return date.to_i if date.is_a?(Numeric)
+
+ (date - EPOCH_START).to_i
+ end
+
+ def self.decode(int)
+ EPOCH_START + int
+ end
+ end
+
+ module TimestampMillis
+ def self.encode(value)
+ return value.to_i if value.is_a?(Numeric)
+
+ time = value.to_time
+ time.to_i * 1000 + time.usec / 1000
+ end
+
+ def self.decode(int)
+ s, ms = int / 1000, int % 1000
+ Time.at(s, ms * 1000).utc
+ end
+ end
+
+ module TimestampMicros
+ def self.encode(value)
+ return value.to_i if value.is_a?(Numeric)
+
+ time = value.to_time
+ time.to_i * 1000_000 + time.usec
+ end
+
+ def self.decode(int)
+ s, us = int / 1000_000, int % 1000_000
+ Time.at(s, us).utc
+ end
+ end
+
+ module Identity
+ def self.encode(datum)
+ datum
+ end
+
+ def self.decode(datum)
+ datum
+ end
+ end
+
+ TYPES = {
+ "int" => {
+ "date" => IntDate
+ },
+ "long" => {
+ "timestamp-millis" => TimestampMillis,
+ "timestamp-micros" => TimestampMicros
+ },
+ }.freeze
+
+ def self.type_adapter(type, logical_type)
+ return unless logical_type
+
+ TYPES.fetch(type, {}.freeze).fetch(logical_type, Identity)
+ end
+ end
+end
diff --git a/lang/ruby/lib/avro/schema.rb b/lang/ruby/lib/avro/schema.rb
index 024d56230..3acd07b7f 100644
--- a/lang/ruby/lib/avro/schema.rb
+++ b/lang/ruby/lib/avro/schema.rb
@@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+require 'avro/logical_types'
+
module Avro
class Schema
# Sets of strings, for backwards compatibility. See below for sets of
symbols,
@@ -40,6 +42,7 @@ def self.parse(json_string)
def self.real_parse(json_obj, names=nil, default_namespace=nil)
if json_obj.is_a? Hash
type = json_obj['type']
+ logical_type = json_obj['logicalType']
raise SchemaParseError, %Q(No "type" property: #{json_obj}) if
type.nil?
# Check that the type is valid before calling #to_sym, since symbols
are never garbage
@@ -50,7 +53,7 @@ def self.real_parse(json_obj, names=nil,
default_namespace=nil)
type_sym = type.to_sym
if PRIMITIVE_TYPES_SYM.include?(type_sym)
- return PrimitiveSchema.new(type_sym)
+ return PrimitiveSchema.new(type_sym, logical_type)
elsif NAMED_TYPES_SYM.include? type_sym
name = json_obj['name']
@@ -58,7 +61,7 @@ def self.real_parse(json_obj, names=nil,
default_namespace=nil)
case type_sym
when :fixed
size = json_obj['size']
- return FixedSchema.new(name, namespace, size, names)
+ return FixedSchema.new(name, namespace, size, names, logical_type)
when :enum
symbols = json_obj['symbols']
doc = json_obj['doc']
@@ -93,23 +96,29 @@ def self.real_parse(json_obj, names=nil,
default_namespace=nil)
end
# Determine if a ruby datum is an instance of a schema
- def self.validate(expected_schema, datum)
- SchemaValidator.validate!(expected_schema, datum)
+ def self.validate(expected_schema, logical_datum, encoded = false)
+ SchemaValidator.validate!(expected_schema, logical_datum, encoded)
true
rescue SchemaValidator::ValidationError
false
end
- def initialize(type)
+ def initialize(type, logical_type=nil)
@type_sym = type.is_a?(Symbol) ? type : type.to_sym
+ @logical_type = logical_type
end
attr_reader :type_sym
+ attr_reader :logical_type
# Returns the type as a string (rather than a symbol), for backwards
compatibility.
# Deprecated in favor of {#type_sym}.
def type; @type_sym.to_s; end
+ def type_adapter
+ @type_adapter ||= LogicalTypes.type_adapter(type, logical_type) ||
LogicalTypes::Identity
+ end
+
# Returns the MD5 fingerprint of the schema as an Integer.
def md5_fingerprint
parsing_form = SchemaNormalization.to_parsing_form(self)
@@ -157,7 +166,9 @@ def subparse(json_obj, names=nil, namespace=nil)
end
def to_avro(names=nil)
- {'type' => type}
+ props = {'type' => type}
+ props['logicalType'] = logical_type if logical_type
+ props
end
def to_s
@@ -166,8 +177,9 @@ def to_s
class NamedSchema < Schema
attr_reader :name, :namespace
- def initialize(type, name, namespace=nil, names=nil, doc=nil)
- super(type)
+
+ def initialize(type, name, namespace=nil, names=nil, doc=nil,
logical_type=nil)
+ super(type, logical_type)
@name, @namespace = Name.extract_namespace(name, namespace)
@doc = doc
names = Name.add_name(names, self)
@@ -318,11 +330,11 @@ def to_avro(names=Set.new)
# Valid primitive types are in PRIMITIVE_TYPES.
class PrimitiveSchema < Schema
- def initialize(type)
+ def initialize(type, logical_type=nil)
if PRIMITIVE_TYPES_SYM.include?(type)
- super(type)
+ super(type, logical_type)
elsif PRIMITIVE_TYPES.include?(type)
- super(type.to_sym)
+ super(type.to_sym, logical_type)
else
raise AvroError.new("#{type} is not a valid primitive type.")
end
@@ -336,12 +348,12 @@ def to_avro(names=nil)
class FixedSchema < NamedSchema
attr_reader :size
- def initialize(name, space, size, names=nil)
+ def initialize(name, space, size, names=nil, logical_type=nil)
# Ensure valid cto args
unless size.is_a?(Integer)
raise AvroError, 'Fixed Schema requires a valid integer for size
property.'
end
- super(:fixed, name, space, names)
+ super(:fixed, name, space, names, logical_type)
@size = size
end
diff --git a/lang/ruby/lib/avro/schema_validator.rb
b/lang/ruby/lib/avro/schema_validator.rb
index 89b0a9c1e..67464fbfe 100644
--- a/lang/ruby/lib/avro/schema_validator.rb
+++ b/lang/ruby/lib/avro/schema_validator.rb
@@ -62,16 +62,22 @@ def to_s
TypeMismatchError = Class.new(ValidationError)
class << self
- def validate!(expected_schema, datum)
+ def validate!(expected_schema, logical_datum, encoded = false)
result = Result.new
- validate_recursive(expected_schema, datum, ROOT_IDENTIFIER, result)
+ validate_recursive(expected_schema, logical_datum, ROOT_IDENTIFIER,
result, encoded)
fail ValidationError, result if result.failure?
result
end
private
- def validate_recursive(expected_schema, datum, path, result)
+ def validate_recursive(expected_schema, logical_datum, path, result,
encoded = false)
+ datum = if encoded
+ logical_datum
+ else
+ expected_schema.type_adapter.encode(logical_datum) rescue nil
+ end
+
case expected_schema.type_sym
when :null
fail TypeMismatchError unless datum.nil?
diff --git a/lang/ruby/test/random_data.rb b/lang/ruby/test/random_data.rb
index 9d276f7d2..54fa8781d 100644
--- a/lang/ruby/test/random_data.rb
+++ b/lang/ruby/test/random_data.rb
@@ -27,15 +27,17 @@ def next
end
def nextdata(schm, d=0)
+ return logical_nextdata(schm, d=0) unless
schm.type_adapter.eql?(Avro::LogicalTypes::Identity)
+
case schm.type_sym
when :boolean
rand > 0.5
when :string
randstr()
when :int
- rand(Avro::Schema::INT_MAX_VALUE - Avro::Schema::INT_MIN_VALUE) +
Avro::Schema::INT_MIN_VALUE
+ rand_int
when :long
- rand(Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) +
Avro::Schema::LONG_MIN_VALUE
+ rand_long
when :float
(-1024 + 2048 * rand).round.to_f
when :double
@@ -79,6 +81,15 @@ def nextdata(schm, d=0)
end
end
+ def logical_nextdata(schm, _d=0)
+ case schm.logical_type
+ when 'date'
+ Avro::LogicalTypes::IntDate.decode(rand_int)
+ when 'timestamp-millis', 'timestamp-micros'
+ Avro::LogicalTypes::TimestampMicros.decode(rand_long)
+ end
+ end
+
CHARPOOL = 'abcdefghjkmnpqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ23456789'
BYTEPOOL = '12345abcd'
@@ -87,4 +98,12 @@ def randstr(chars=CHARPOOL, length=20)
rand(length+1).times { str << chars[rand(chars.size)] }
str
end
+
+ def rand_int
+ rand(Avro::Schema::INT_MAX_VALUE - Avro::Schema::INT_MIN_VALUE) +
Avro::Schema::INT_MIN_VALUE
+ end
+
+ def rand_long
+ rand(Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) +
Avro::Schema::LONG_MIN_VALUE
+ end
end
diff --git a/lang/ruby/test/test_io.rb b/lang/ruby/test/test_io.rb
index fc0088b41..70bb4d60c 100644
--- a/lang/ruby/test/test_io.rb
+++ b/lang/ruby/test/test_io.rb
@@ -84,6 +84,17 @@ def test_record
check_default(record_schema, '{"f": 11}', {"f" => 11})
end
+ def test_record_with_logical_type
+ record_schema = <<EOS
+ {"type": "record",
+ "name": "Test",
+ "fields": [{"name": "ts",
+ "type": {"type": "long",
+ "logicalType": "timestamp-micros"}}]}
+EOS
+ check(record_schema)
+ end
+
def test_error
error_schema = <<EOS
{"type": "error",
@@ -115,6 +126,7 @@ def test_recursive
def test_union
union_schema = <<EOS
["string",
+ {"type": "int", "logicalType": "date"},
"null",
"long",
{"type": "record",
@@ -451,7 +463,7 @@ def check(str)
def checkser(schm, randomdata)
datum = randomdata.next
- assert validate(schm, datum)
+ assert validate(schm, datum), 'datum is not valid for schema'
w = Avro::IO::DatumWriter.new(schm)
writer = StringIO.new "", "w"
w.write(datum, Avro::IO::BinaryEncoder.new(writer))
diff --git a/lang/ruby/test/test_logical_types.rb
b/lang/ruby/test/test_logical_types.rb
new file mode 100644
index 000000000..5416e117d
--- /dev/null
+++ b/lang/ruby/test/test_logical_types.rb
@@ -0,0 +1,120 @@
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'test_help'
+
+class TestLogicalTypes < Test::Unit::TestCase
+ def test_int_date
+ schema = Avro::Schema.parse <<-SCHEMA
+ { "type": "int", "logicalType": "date" }
+ SCHEMA
+
+ assert_equal 'date', schema.logical_type
+ today = Date.today
+ assert_encode_and_decode today, schema
+ assert_preencoded Avro::LogicalTypes::IntDate.encode(today), schema, today
+ end
+
+ def test_int_date_conversion
+ type = Avro::LogicalTypes::IntDate
+
+ assert_equal 5, type.encode(Date.new(1970, 1, 6))
+ assert_equal 0, type.encode(Date.new(1970, 1, 1))
+ assert_equal -5, type.encode(Date.new(1969, 12, 27))
+
+ assert_equal Date.new(1970, 1, 6), type.decode(5)
+ assert_equal Date.new(1970, 1, 1), type.decode(0)
+ assert_equal Date.new(1969, 12, 27), type.decode(-5)
+ end
+
+ def test_timestamp_millis_long
+ schema = Avro::Schema.parse <<-SCHEMA
+ { "type": "long", "logicalType": "timestamp-millis" }
+ SCHEMA
+
+ # The Time.at format is (seconds, microseconds) since Epoch.
+ time = Time.at(628232400, 12000)
+
+ assert_equal 'timestamp-millis', schema.logical_type
+ assert_encode_and_decode time, schema
+ assert_preencoded Avro::LogicalTypes::TimestampMillis.encode(time),
schema, time.utc
+ end
+
+ def test_timestamp_millis_long_conversion
+ type = Avro::LogicalTypes::TimestampMillis
+
+ now = Time.now.utc
+ now_millis = Time.utc(now.year, now.month, now.day, now.hour, now.min,
now.sec, now.usec / 1000 * 1000)
+
+ assert_equal now_millis, type.decode(type.encode(now_millis))
+ assert_equal 1432849613221, type.encode(Time.utc(2015, 5, 28, 21, 46, 53,
221000))
+ assert_equal 1432849613221, type.encode(DateTime.new(2015, 5, 28, 21, 46,
53.221))
+ assert_equal Time.utc(2015, 5, 28, 21, 46, 53, 221000),
type.decode(1432849613221)
+ end
+
+ def test_timestamp_micros_long
+ schema = Avro::Schema.parse <<-SCHEMA
+ { "type": "long", "logicalType": "timestamp-micros" }
+ SCHEMA
+
+ # The Time.at format is (seconds, microseconds) since Epoch.
+ time = Time.at(628232400, 12345)
+
+ assert_equal 'timestamp-micros', schema.logical_type
+ assert_encode_and_decode time, schema
+ assert_preencoded Avro::LogicalTypes::TimestampMicros.encode(time),
schema, time.utc
+ end
+
+ def test_timestamp_micros_long_conversion
+ type = Avro::LogicalTypes::TimestampMicros
+
+ now = Time.now.utc
+
+ assert_equal Time.utc(now.year, now.month, now.day, now.hour, now.min,
now.sec, now.usec), type.decode(type.encode(now))
+ assert_equal 1432849613221843, type.encode(Time.utc(2015, 5, 28, 21, 46,
53, 221843))
+ assert_equal 1432849613221843, type.encode(DateTime.new(2015, 5, 28, 21,
46, 53.221843))
+ assert_equal Time.utc(2015, 5, 28, 21, 46, 53, 221843),
type.decode(1432849613221843)
+ end
+
+ def encode(datum, schema)
+ buffer = StringIO.new("")
+ encoder = Avro::IO::BinaryEncoder.new(buffer)
+
+ datum_writer = Avro::IO::DatumWriter.new(schema)
+ datum_writer.write(datum, encoder)
+
+ buffer.string
+ end
+
+ def decode(encoded, schema)
+ buffer = StringIO.new(encoded)
+ decoder = Avro::IO::BinaryDecoder.new(buffer)
+
+ datum_reader = Avro::IO::DatumReader.new(schema, schema)
+ datum_reader.read(decoder)
+ end
+
+ def assert_encode_and_decode(datum, schema)
+ encoded = encode(datum, schema)
+ assert_equal datum, decode(encoded, schema)
+ end
+
+ def assert_preencoded(datum, schema, decoded)
+ encoded = encode(datum, schema)
+ assert_equal decoded, decode(encoded, schema)
+ end
+end
diff --git a/lang/ruby/test/test_schema.rb b/lang/ruby/test/test_schema.rb
index 48fe0a554..66ea77b75 100644
--- a/lang/ruby/test/test_schema.rb
+++ b/lang/ruby/test/test_schema.rb
@@ -132,6 +132,21 @@ def test_to_avro_includes_namespaces
}, schema.to_avro)
end
+ def test_to_avro_includes_logical_type
+ schema = Avro::Schema.parse <<-SCHEMA
+ {"type": "record", "name": "has_logical", "fields": [
+ {"name": "dt", "type": {"type": "int", "logicalType": "date"}}]
+ }
+ SCHEMA
+
+ assert_equal schema.to_avro, {
+ 'type' => 'record', 'name' => 'has_logical',
+ 'fields' => [
+ {'name' => 'dt', 'type' => {'type' => 'int', 'logicalType' => 'date'}}
+ ]
+ }
+ end
+
def test_unknown_named_type
error = assert_raise Avro::UnknownSchemaError do
Avro::Schema.parse <<-SCHEMA
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add LogicalType support to the Ruby library
> -------------------------------------------
>
> Key: AVRO-1695
> URL: https://issues.apache.org/jira/browse/AVRO-1695
> Project: Apache Avro
> Issue Type: New Feature
> Components: ruby
> Reporter: Daniel Schierbeck
> Assignee: Martin Jubelgas
> Priority: Major
> Fix For: 1.9.0
>
>
> It would be nice if the Ruby library had feature parity. I would be willing
> to contribute work if someone could review and merge the code.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)