jecsand838 commented on code in PR #9328:
URL: https://github.com/apache/arrow-rs/pull/9328#discussion_r2776103484
##########
arrow-avro/src/reader/record.rs:
##########
@@ -504,20 +512,47 @@ impl Decoder {
};
Ok(match data_type.nullability() {
Some(nullability) => {
- // Default to reading a union branch tag unless the resolution
proves otherwise.
- let mut plan = NullablePlan::ReadTag;
- if let Some(ResolutionInfo::Union(info)) =
data_type.resolution.as_ref() {
- if !info.writer_is_union && info.reader_is_union {
- if let Some(Some((_reader_idx, promo))) =
info.writer_to_reader.first() {
- plan = NullablePlan::FromSingle { promotion:
*promo };
+ // Default to reading a union branch tag unless the resolution
directs otherwise.
+ let plan = match &data_type.resolution {
+ None => NullablePlan::ReadTag {
+ nullability,
+ resolution:
ResolutionPlan::Promotion(Promotion::Direct),
+ },
+ Some(ResolutionInfo::Promotion(_)) => {
+ // Promotions should have been incorporated
+ // into the inner decoder.
+ NullablePlan::FromSingle {
+ resolution:
ResolutionPlan::Promotion(Promotion::Direct),
}
}
- }
+ Some(ResolutionInfo::Union(info)) if !info.writer_is_union
=> {
+ let Some(Some((_, resolution))) =
info.writer_to_reader.first() else {
+ panic!(
+ "unexpected union resolution info for
non-union writer and union reader type",
+ );
+ };
+ let resolution = ResolutionPlan::try_new(&decoder,
resolution)?;
+ NullablePlan::FromSingle { resolution }
+ }
+ Some(ResolutionInfo::Union(info)) => {
+ let Some((_, resolution)) =
+
info.writer_to_reader[nullability.non_null_index()].as_ref()
+ else {
+ panic!("unexpected union resolution info for
nullable writer type");
Review Comment:
Same here. The panic on unexpected nullable-union mapping turns a
recoverable schema mismatch into a hard crash.
##########
arrow-avro/src/reader/record.rs:
##########
@@ -504,20 +512,47 @@ impl Decoder {
};
Ok(match data_type.nullability() {
Some(nullability) => {
- // Default to reading a union branch tag unless the resolution
proves otherwise.
- let mut plan = NullablePlan::ReadTag;
- if let Some(ResolutionInfo::Union(info)) =
data_type.resolution.as_ref() {
- if !info.writer_is_union && info.reader_is_union {
- if let Some(Some((_reader_idx, promo))) =
info.writer_to_reader.first() {
- plan = NullablePlan::FromSingle { promotion:
*promo };
+ // Default to reading a union branch tag unless the resolution
directs otherwise.
+ let plan = match &data_type.resolution {
+ None => NullablePlan::ReadTag {
+ nullability,
+ resolution:
ResolutionPlan::Promotion(Promotion::Direct),
+ },
+ Some(ResolutionInfo::Promotion(_)) => {
+ // Promotions should have been incorporated
+ // into the inner decoder.
+ NullablePlan::FromSingle {
+ resolution:
ResolutionPlan::Promotion(Promotion::Direct),
}
}
- }
+ Some(ResolutionInfo::Union(info)) if !info.writer_is_union
=> {
+ let Some(Some((_, resolution))) =
info.writer_to_reader.first() else {
+ panic!(
+ "unexpected union resolution info for
non-union writer and union reader type",
+ );
Review Comment:
This new panic in decoder construction can abort the process for
malformed/partial union resolution state. Since this path already returns
`Result`, I think using `return AvroError::SchemaError` (or `ParseError`)
instead of panicking would be better, so callers can decide how to handle the
failure.
##########
arrow-avro/src/reader/record.rs:
##########
@@ -1054,10 +1082,45 @@ impl Decoder {
}
}
+ fn decode_with_resolution<'d>(
+ &'d mut self,
+ buf: &mut AvroCursor<'_>,
+ resolution: &'d ResolutionPlan,
+ ) -> Result<(), AvroError> {
+ #[cfg(feature = "avro_custom_types")]
+ if let Self::RunEndEncoded(_, len, inner) = self {
+ *len += 1;
+ return inner.decode_with_resolution(buf, resolution);
+ }
+
+ match resolution {
+ ResolutionPlan::Promotion(promotion) => {
+ let promotion = *promotion;
+ self.decode_with_promotion(buf, promotion)
+ }
+ ResolutionPlan::DefaultValue(lit) => self.append_default(lit),
+ ResolutionPlan::EnumMapping(res) => {
+ let Self::Enum(indices, _, _) = self else {
+ panic!("enum mapping resolution provided for non-enum
decoder");
+ };
+ let raw = buf.get_int()?;
+ let resolved = res.resolve(raw)?;
+ indices.push(resolved);
+ Ok(())
+ }
+ ResolutionPlan::Record(proj) => {
+ let Self::Record(_, encodings, _, _) = self else {
+ panic!("record projection provided for non-record
decoder");
Review Comment:
Same here as well. We should probably be passing `AvroError`s back to the
caller instead of potentially crashing imo.
##########
arrow-avro/src/codec.rs:
##########
@@ -1533,62 +1550,35 @@ impl<'a> Maker<'a> {
nullable_union_variants(reader_variants)
{
let mut dt = self.resolve_type(writer_non_union,
non_null_branch, namespace)?;
- let non_null_idx = match nullability {
- Nullability::NullFirst => 1,
- Nullability::NullSecond => 0,
- };
#[cfg(feature = "avro_custom_types")]
Self::propagate_nullability_into_ree(&mut dt, nullability);
dt.nullability = Some(nullability);
- let promotion = Self::coercion_from(&dt);
- dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
- writer_to_reader: Arc::from(vec![Some((non_null_idx,
promotion))]),
- writer_is_union: false,
- reader_is_union: true,
- }));
+ // Ensure resolution is set to a non-Union variant to
suppress
+ // reading the union tag which is the default behavior.
+ if dt.resolution.is_none() {
+ dt.resolution =
Some(ResolutionInfo::Promotion(Promotion::Direct));
+ }
Ok(dt)
} else {
- let mut best_match: Option<(usize, AvroDataType,
Promotion)> = None;
- for (i, variant) in reader_variants.iter().enumerate() {
- if let Ok(resolved_dt) =
- self.resolve_type(writer_non_union, variant,
namespace)
- {
- let promotion = Self::coercion_from(&resolved_dt);
- if promotion == Promotion::Direct {
- best_match = Some((i, resolved_dt, promotion));
- break;
- } else if best_match.is_none() {
- best_match = Some((i, resolved_dt, promotion));
- }
- }
- }
- let Some((match_idx, match_dt, promotion)) = best_match
else {
+ let Some((match_idx, resolution)) =
+ self.find_best_union_match(writer_non_union,
reader_variants, namespace)
+ else {
return Err(ArrowError::SchemaError(
"Writer schema does not match any reader union
branch".to_string(),
));
};
- let mut children =
Vec::with_capacity(reader_variants.len());
- let mut match_dt = Some(match_dt);
- for (i, variant) in reader_variants.iter().enumerate() {
- if i == match_idx {
- if let Some(mut dt) = match_dt.take() {
- if matches!(dt.resolution,
Some(ResolutionInfo::Promotion(_))) {
- dt.resolution = None;
- }
- children.push(dt);
- }
- } else {
- children.push(self.parse_type(variant,
namespace)?);
- }
- }
+ let children = reader_variants
+ .iter()
+ .map(|variant| self.parse_type(variant, namespace))
+ .collect::<Result<Vec<_>, _>>()?;
Review Comment:
I think there's a risk of a regression here by not using the `resolve_type`
to preserve field level resolution, especially the DefaultValue.
See the test below and let me know what you think.
```rust
#[test]
fn
test_resolve_writer_non_union_record_to_reader_union_preserves_defaults() {
// Writer: record Inner{a: int}
// Reader: union [Inner{a: int, b: int default 42}, string]
// The matching child (Inner) should preserve DefaultValue(Int(42))
on field b.
let writer = Schema::Complex(ComplexType::Record(Record {
name: "Inner",
namespace: None,
doc: None,
aliases: vec![],
fields: vec![AvroFieldSchema {
name: "a",
doc: None,
r#type: mk_primitive(PrimitiveType::Int),
default: None,
aliases: vec![],
}],
attributes: Attributes::default(),
}));
let reader = mk_union(vec![
Schema::Complex(ComplexType::Record(Record {
name: "Inner",
namespace: None,
doc: None,
aliases: vec![],
fields: vec![
AvroFieldSchema {
name: "a",
doc: None,
r#type: mk_primitive(PrimitiveType::Int),
default: None,
aliases: vec![],
},
AvroFieldSchema {
name: "b",
doc: None,
r#type: mk_primitive(PrimitiveType::Int),
default:
Some(Value::Number(serde_json::Number::from(42))),
aliases: vec![],
},
],
attributes: Attributes::default(),
})),
mk_primitive(PrimitiveType::String),
]);
let mut maker = Maker::new(false, false);
let dt = maker
.make_data_type(&writer, Some(&reader), None)
.expect("resolution should succeed");
// Verify the union resolution structure
let resolved = match dt.resolution.as_ref() {
Some(ResolutionInfo::Union(u)) => u,
other => panic!("expected union resolution info, got {other:?}"),
};
assert!(!resolved.writer_is_union && resolved.reader_is_union);
// The matching child (Inner at index 0) should have field b with
DefaultValue
let children = match dt.codec() {
Codec::Union(children, _, _) => children,
other => panic!("expected union codec, got {other:?}"),
};
let inner_fields = match children[0].codec() {
Codec::Struct(f) => f,
other => panic!("expected struct codec for Inner, got
{other:?}"),
};
assert_eq!(inner_fields.len(), 2);
assert_eq!(inner_fields[1].name(), "b");
assert_eq!(
inner_fields[1].data_type().resolution,
Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
"field b should have DefaultValue(Int(42)) from schema
resolution"
);
}
```
##########
arrow-avro/src/reader/record.rs:
##########
@@ -1315,64 +1455,94 @@ const NO_SOURCE: i8 = -1;
impl DispatchLookupTable {
fn from_writer_to_reader(
- promotion_map: &[Option<(usize, Promotion)>],
+ reader_branches: &[Decoder],
+ resolution_map: &[Option<(usize, ResolutionInfo)>],
) -> Result<Self, AvroError> {
- let mut to_reader = Vec::with_capacity(promotion_map.len());
- let mut promotion = Vec::with_capacity(promotion_map.len());
- for map in promotion_map {
- match *map {
- Some((idx, promo)) => {
+ let mut to_reader = Vec::with_capacity(resolution_map.len());
+ let mut resolution = Vec::with_capacity(resolution_map.len());
+ for map in resolution_map {
+ match map {
+ Some((idx, res)) => {
+ let idx = *idx;
let idx_i8 = i8::try_from(idx).map_err(|_| {
AvroError::SchemaError(format!(
"Reader branch index {idx} exceeds i8 range (max
{})",
i8::MAX
))
})?;
+ let plan = ResolutionPlan::try_new(&reader_branches[idx],
res)?;
to_reader.push(idx_i8);
- promotion.push(promo);
+ resolution.push(plan);
}
None => {
to_reader.push(NO_SOURCE);
- promotion.push(Promotion::Direct);
+
resolution.push(ResolutionPlan::DefaultValue(AvroLiteral::Null));
}
}
}
Ok(Self {
to_reader: to_reader.into_boxed_slice(),
- promotion: promotion.into_boxed_slice(),
+ resolution: resolution.into_boxed_slice(),
})
}
- // Resolve a writer branch index to (reader_idx, promotion)
+ // Resolve a writer branch index to (reader_idx, resolution)
#[inline]
- fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
+ fn resolve(&self, writer_index: usize) -> Option<(usize, &ResolutionPlan)>
{
let reader_index = *self.to_reader.get(writer_index)?;
- (reader_index >= 0).then(|| (reader_index as usize,
self.promotion[writer_index]))
+ (reader_index >= 0).then(|| (reader_index as usize,
&self.resolution[writer_index]))
}
}
#[derive(Debug)]
struct UnionDecoder {
fields: UnionFields,
- type_ids: Vec<i8>,
- offsets: Vec<i32>,
- branches: Vec<Decoder>,
- counts: Vec<i32>,
- reader_type_codes: Vec<i8>,
+ branches: UnionDecoderBranches,
Review Comment:
Nice! Love the clean up here :)
##########
arrow-avro/src/reader/record.rs:
##########
@@ -1283,6 +1346,83 @@ impl Decoder {
}
}
+/// Runtime plan for decoding reader-side `["null", T]` types.
+#[derive(Debug)]
+enum NullablePlan {
+ /// Writer actually wrote a union (branch tag present).
+ ReadTag {
+ nullability: Nullability,
+ resolution: ResolutionPlan,
+ },
+ /// Writer wrote a single (non-union) value resolved to the non-null branch
+ /// of the reader union; do NOT read a branch tag, but apply any
resolution.
+ FromSingle { resolution: ResolutionPlan },
+}
+
+/// Runtime plan for resolving writer-reader type differences.
+#[derive(Debug)]
+enum ResolutionPlan {
+ /// Indicates that the writer's type should be promoted to the reader's
type.
+ Promotion(Promotion),
+ /// Provides a default value for the field missing in the writer type.
+ DefaultValue(AvroLiteral),
+ /// Provides mapping information for resolving enums.
+ EnumMapping(EnumResolution),
+ /// Provides projection information for record fields.
+ Record(Projector),
+}
+
+impl ResolutionPlan {
+ fn try_new(decoder: &Decoder, resolution: &ResolutionInfo) -> Result<Self,
AvroError> {
+ match (decoder, resolution) {
+ (_, ResolutionInfo::Promotion(p)) =>
Ok(ResolutionPlan::Promotion(*p)),
+ (_, ResolutionInfo::DefaultValue(lit)) =>
Ok(ResolutionPlan::DefaultValue(lit.clone())),
+ (_, ResolutionInfo::EnumMapping(m)) => {
+ Ok(ResolutionPlan::EnumMapping(EnumResolution::new(m)))
+ }
+ (Decoder::Record(_, _, field_defaults, _),
ResolutionInfo::Record(r)) => Ok(
+ ResolutionPlan::Record(ProjectorBuilder::try_new(r,
field_defaults).build()?),
+ ),
+ (_, ResolutionInfo::Record(_)) => {
+ unreachable!("record resolution on non-record decoder")
Review Comment:
Using `unreachable!` here is potentially unsafe. Probably best to also pass
an `AvroError` back here too. Perhaps it's unreachable right now, but all it
takes is for an edge case to occur or some change down the line to create the
potential for a crash.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]