scovich commented on code in PR #8349:
URL: https://github.com/apache/arrow-rs/pull/8349#discussion_r2362528166
##########
arrow-avro/src/reader/record.rs:
##########
@@ -214,6 +254,148 @@ struct EnumResolution {
default_index: i32,
}
+#[derive(Debug, Clone, Copy)]
+enum BranchDispatch {
+ NoMatch,
+ ToReader {
+ reader_idx: usize,
+ promotion: Promotion,
+ },
+}
+
+#[derive(Debug)]
+struct UnionResolution {
+ dispatch: Option<Arc<[BranchDispatch]>>,
+ kind: UnionResolvedKind,
+}
+
+#[derive(Debug)]
+enum UnionResolvedKind {
+ Both {
+ reader_type_codes: Arc<[i8]>,
+ },
+ ToSingle {
+ target: Box<Decoder>,
+ },
+ FromSingle {
+ reader_type_codes: Arc<[i8]>,
+ target_reader_index: usize,
+ promotion: Promotion,
+ },
+}
+
+#[derive(Debug, Default)]
+struct UnionResolutionBuilder {
+ fields: Option<UnionFields>,
+ resolved: Option<ResolvedUnion>,
+}
+
+impl UnionResolutionBuilder {
+ #[inline]
Review Comment:
It seems highly unlikely that building union resolvers would be on the
critical path of any workload that involves any actual data? Is `#[inline]`
really needed on these methods?
##########
arrow-avro/src/reader/record.rs:
##########
@@ -214,6 +254,148 @@ struct EnumResolution {
default_index: i32,
}
+#[derive(Debug, Clone, Copy)]
+enum BranchDispatch {
+ NoMatch,
+ ToReader {
+ reader_idx: usize,
+ promotion: Promotion,
+ },
+}
+
+#[derive(Debug)]
+struct UnionResolution {
+ dispatch: Option<Arc<[BranchDispatch]>>,
+ kind: UnionResolvedKind,
+}
+
+#[derive(Debug)]
+enum UnionResolvedKind {
+ Both {
+ reader_type_codes: Arc<[i8]>,
+ },
+ ToSingle {
+ target: Box<Decoder>,
Review Comment:
`Decoder` seems to be a concrete type, so why boxed? Is there somehow
infinite type recursion?
##########
arrow-avro/src/reader/record.rs:
##########
@@ -486,6 +734,70 @@ impl Decoder {
Self::Decimal256(_, _, _, builder) =>
builder.append_value(i256::ZERO),
Self::Enum(indices, _, _) => indices.push(0),
Self::Duration(builder) => builder.append_null(),
+ Self::Union(fields, type_ids, offsets, encodings, encoding_counts,
None) => {
+ let mut chosen = None;
+ for (i, ch) in encodings.iter().enumerate() {
+ if matches!(ch, Decoder::Null(_)) {
+ chosen = Some(i);
+ break;
+ }
+ }
+ let idx = chosen.unwrap_or(0);
+ let type_id = fields
+ .iter()
+ .nth(idx)
+ .map(|(type_id, _)| type_id)
+ .unwrap_or_else(|| i8::try_from(idx).unwrap_or(0));
Review Comment:
When could the iter-nth call come up empty? Are unions allowed to be empty,
and our choice of defaulting to idx=0 could produce an out of bounds access? Or
is it possible that `encodings.len() != fields.len()`?
If the former, recommend to keep `idx` as an `Option`, so the control flow
is more direct (= easier to follow).
If the latter... I don't think I understand what this code is doing (which
is probably true either way, if I'm being honest)
##########
arrow-avro/src/reader/record.rs:
##########
@@ -425,6 +645,34 @@ impl Decoder {
Box::new(val_dec),
)
}
+ (Codec::Union(encodings, fields, mode), _) => {
+ if *mode != UnionMode::Dense {
+ return Err(ArrowError::NotYetImplemented(
+ "Sparse Arrow unions are not yet
supported".to_string(),
+ ));
+ }
+ let mut decoders = Vec::with_capacity(encodings.len());
+ for c in encodings.iter() {
+ decoders.push(Self::try_new_internal(c)?);
+ }
Review Comment:
```suggestion
let decoders = encodings.iter().map(Self::try_new_internal);
```
and then below, pass
```rust
decoders.collect::<Result<Vec<_>>()?,
```
##########
arrow-avro/src/reader/record.rs:
##########
@@ -700,6 +1012,65 @@ impl Decoder {
"Default for enum must be a symbol".to_string(),
)),
},
+ Self::Union(fields, type_ids, offsets, encodings, encoding_counts,
None) => {
+ if encodings.is_empty() {
+ return Err(ArrowError::InvalidArgumentError(
+ "Union default cannot be applied to empty
union".to_string(),
+ ));
+ }
+ let type_id = fields
+ .iter()
+ .nth(0)
+ .map(|(type_id, _)| type_id)
+ .unwrap_or(0_i8);
+ type_ids.push(type_id);
+ offsets.push(encoding_counts[0]);
+ encodings[0].append_default(lit)?;
+ encoding_counts[0] += 1;
+ Ok(())
+ }
+ Self::Union(
+ fields,
+ type_ids,
+ offsets,
+ encodings,
+ encoding_counts,
+ Some(union_resolution),
+ ) => match &mut union_resolution.kind {
+ UnionResolvedKind::Both { .. } => {
+ if encodings.is_empty() {
+ return Err(ArrowError::InvalidArgumentError(
+ "Union default cannot be applied to empty
union".to_string(),
+ ));
+ }
+ let type_id = fields
+ .iter()
+ .nth(0)
+ .map(|(type_id, _)| type_id)
+ .unwrap_or(0_i8);
+ type_ids.push(type_id);
+ offsets.push(encoding_counts[0]);
+ encodings[0].append_default(lit)?;
+ encoding_counts[0] += 1;
+ Ok(())
+ }
+ UnionResolvedKind::ToSingle { target } =>
target.append_default(lit),
+ UnionResolvedKind::FromSingle {
+ target_reader_index,
+ ..
+ } => {
+ let type_id = fields
Review Comment:
Seems like a lot of redundancy between `Both` and `FromSingle` cases?
I _think_ the former just hardwires `target_reader_index=0`?
##########
arrow-avro/src/reader/record.rs:
##########
@@ -852,6 +1303,94 @@ impl Decoder {
Ok(())
}
+ fn decode_with_promotion(
+ &mut self,
+ buf: &mut AvroCursor<'_>,
+ promotion: Promotion,
+ ) -> Result<(), ArrowError> {
+ match promotion {
+ Promotion::Direct => self.decode(buf),
+ Promotion::IntToLong => match self {
+ Self::Int64(v) => {
+ v.push(buf.get_int()? as i64);
+ Ok(())
+ }
+ _ => Err(ArrowError::ParseError(
+ "Promotion Int->Long target mismatch".into(),
+ )),
+ },
+ Promotion::IntToFloat => match self {
+ Self::Float32(v) => {
+ v.push(buf.get_int()? as f32);
+ Ok(())
+ }
+ _ => Err(ArrowError::ParseError(
+ "Promotion Int->Float target mismatch".into(),
+ )),
+ },
+ Promotion::IntToDouble => match self {
+ Self::Float64(v) => {
+ v.push(buf.get_int()? as f64);
+ Ok(())
+ }
+ _ => Err(ArrowError::ParseError(
+ "Promotion Int->Double target mismatch".into(),
+ )),
+ },
+ Promotion::LongToFloat => match self {
+ Self::Float32(v) => {
+ v.push(buf.get_long()? as f32);
+ Ok(())
+ }
+ _ => Err(ArrowError::ParseError(
+ "Promotion Long->Float target mismatch".into(),
+ )),
+ },
+ Promotion::LongToDouble => match self {
+ Self::Float64(v) => {
+ v.push(buf.get_long()? as f64);
+ Ok(())
+ }
+ _ => Err(ArrowError::ParseError(
+ "Promotion Long->Double target mismatch".into(),
+ )),
+ },
+ Promotion::FloatToDouble => match self {
+ Self::Float64(v) => {
+ v.push(buf.get_float()? as f64);
+ Ok(())
+ }
+ _ => Err(ArrowError::ParseError(
+ "Promotion Float->Double target mismatch".into(),
+ )),
+ },
+ Promotion::StringToBytes => match self {
+ Self::Binary(offsets, values) | Self::StringToBytes(offsets,
values) => {
+ let data = buf.get_bytes()?;
+ offsets.push_length(data.len());
+ values.extend_from_slice(data);
+ Ok(())
+ }
+ _ => Err(ArrowError::ParseError(
+ "Promotion String->Bytes target mismatch".into(),
+ )),
+ },
+ Promotion::BytesToString => match self {
Review Comment:
This one isn't the same as the others... it's technically a narrowing cast
which could fail if the input bytes are not valid utf-8. At least the int ->
float conversions are converting casts (= infallible).
Does the spec require this? And if so, should we validate the data here?
##########
arrow-avro/src/reader/record.rs:
##########
@@ -852,6 +1303,94 @@ impl Decoder {
Ok(())
}
+ fn decode_with_promotion(
+ &mut self,
+ buf: &mut AvroCursor<'_>,
+ promotion: Promotion,
+ ) -> Result<(), ArrowError> {
+ match promotion {
+ Promotion::Direct => self.decode(buf),
+ Promotion::IntToLong => match self {
+ Self::Int64(v) => {
+ v.push(buf.get_int()? as i64);
Review Comment:
Seems better to document the lossless conversions by e.g.
```suggestion
v.push(i64::from(buf.get_int()?));
```
or even
```suggestion
v.push(buf.get_int()?.into());
```
?
(I think most of these promotions are lossless and could benefit from the
same)
##########
arrow-avro/src/reader/record.rs:
##########
@@ -214,6 +254,148 @@ struct EnumResolution {
default_index: i32,
}
+#[derive(Debug, Clone, Copy)]
+enum BranchDispatch {
+ NoMatch,
+ ToReader {
+ reader_idx: usize,
+ promotion: Promotion,
+ },
+}
+
+#[derive(Debug)]
+struct UnionResolution {
+ dispatch: Option<Arc<[BranchDispatch]>>,
+ kind: UnionResolvedKind,
+}
+
+#[derive(Debug)]
+enum UnionResolvedKind {
+ Both {
+ reader_type_codes: Arc<[i8]>,
+ },
+ ToSingle {
+ target: Box<Decoder>,
+ },
+ FromSingle {
+ reader_type_codes: Arc<[i8]>,
+ target_reader_index: usize,
+ promotion: Promotion,
+ },
+}
+
+#[derive(Debug, Default)]
+struct UnionResolutionBuilder {
+ fields: Option<UnionFields>,
+ resolved: Option<ResolvedUnion>,
+}
+
+impl UnionResolutionBuilder {
+ #[inline]
+ fn new() -> Self {
+ Self {
+ fields: None,
+ resolved: None,
+ }
+ }
+
+ #[inline]
+ fn with_fields(mut self, fields: UnionFields) -> Self {
+ self.fields = Some(fields);
+ self
+ }
+
+ #[inline]
+ fn with_resolved_union(mut self, resolved_union: &ResolvedUnion) -> Self {
+ self.resolved = Some(resolved_union.clone());
+ self
+ }
+
+ fn build(self) -> Result<UnionResolution, ArrowError> {
+ let info = self.resolved.ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "UnionResolutionBuilder requires resolved_union to be
provided".to_string(),
+ )
+ })?;
+ match (info.writer_is_union, info.reader_is_union) {
+ (true, true) => {
+ let fields = self.fields.ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "UnionResolutionBuilder for reader union requires
fields".to_string(),
+ )
+ })?;
+ let reader_type_codes: Vec<i8> =
+ fields.iter().map(|(tid, _)| tid).collect::<Vec<_>>();
Review Comment:
nit
```suggestion
let reader_type_codes: Vec<i8> = fields.iter().map(|(tid,
_)| tid).collect();
```
(tho fmt will probably split the monadic chain across several lines because
"too complex")
(again below)
##########
arrow-avro/src/reader/record.rs:
##########
@@ -214,6 +254,148 @@ struct EnumResolution {
default_index: i32,
}
+#[derive(Debug, Clone, Copy)]
+enum BranchDispatch {
+ NoMatch,
+ ToReader {
+ reader_idx: usize,
+ promotion: Promotion,
+ },
+}
+
+#[derive(Debug)]
+struct UnionResolution {
+ dispatch: Option<Arc<[BranchDispatch]>>,
+ kind: UnionResolvedKind,
+}
+
+#[derive(Debug)]
+enum UnionResolvedKind {
+ Both {
+ reader_type_codes: Arc<[i8]>,
+ },
+ ToSingle {
+ target: Box<Decoder>,
+ },
+ FromSingle {
+ reader_type_codes: Arc<[i8]>,
+ target_reader_index: usize,
+ promotion: Promotion,
+ },
+}
+
+#[derive(Debug, Default)]
+struct UnionResolutionBuilder {
+ fields: Option<UnionFields>,
+ resolved: Option<ResolvedUnion>,
+}
+
+impl UnionResolutionBuilder {
+ #[inline]
+ fn new() -> Self {
+ Self {
+ fields: None,
+ resolved: None,
+ }
+ }
+
+ #[inline]
+ fn with_fields(mut self, fields: UnionFields) -> Self {
+ self.fields = Some(fields);
+ self
+ }
+
+ #[inline]
+ fn with_resolved_union(mut self, resolved_union: &ResolvedUnion) -> Self {
+ self.resolved = Some(resolved_union.clone());
Review Comment:
I believe it's recommended to take an owned arg, if owned is needed.
Otherwise, a caller who _could_ relinquish ownership of a value cannot do so,
and we end up needlessly cloning.
... but it looks like all callers of this (private) method only have a
reference, so maybe it's fine to leave as-is?
##########
arrow-avro/src/reader/record.rs:
##########
@@ -214,6 +254,148 @@ struct EnumResolution {
default_index: i32,
}
+#[derive(Debug, Clone, Copy)]
+enum BranchDispatch {
+ NoMatch,
+ ToReader {
+ reader_idx: usize,
+ promotion: Promotion,
+ },
+}
+
+#[derive(Debug)]
+struct UnionResolution {
+ dispatch: Option<Arc<[BranchDispatch]>>,
+ kind: UnionResolvedKind,
+}
+
+#[derive(Debug)]
+enum UnionResolvedKind {
+ Both {
+ reader_type_codes: Arc<[i8]>,
+ },
+ ToSingle {
+ target: Box<Decoder>,
+ },
+ FromSingle {
+ reader_type_codes: Arc<[i8]>,
+ target_reader_index: usize,
+ promotion: Promotion,
+ },
+}
+
+#[derive(Debug, Default)]
+struct UnionResolutionBuilder {
+ fields: Option<UnionFields>,
+ resolved: Option<ResolvedUnion>,
+}
+
+impl UnionResolutionBuilder {
+ #[inline]
+ fn new() -> Self {
+ Self {
+ fields: None,
+ resolved: None,
+ }
+ }
+
+ #[inline]
+ fn with_fields(mut self, fields: UnionFields) -> Self {
+ self.fields = Some(fields);
+ self
+ }
+
+ #[inline]
+ fn with_resolved_union(mut self, resolved_union: &ResolvedUnion) -> Self {
+ self.resolved = Some(resolved_union.clone());
+ self
+ }
+
+ fn build(self) -> Result<UnionResolution, ArrowError> {
+ let info = self.resolved.ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "UnionResolutionBuilder requires resolved_union to be
provided".to_string(),
+ )
+ })?;
+ match (info.writer_is_union, info.reader_is_union) {
Review Comment:
Lots of redundancy in the various match arms... can we pre-compute the main
bits to reduce the bug surface?
```rust
let writer_dispatch = info.writer_is_union.then(|| {
info.writer_to_reader
.iter()
.map(...)
.collect::<Vec<_>>()
});
let reader_type_codes = if info.reader_is_union {
let Some(self.fields) = self.fields else {
return Error(...);
};
let reader_type_codes: Vec<i8> = fields.iter().map(...).collect();
Some(reader_type_codes)
} else {
None
};
match (writer_dispatch, reader_type_codes) {
(Some(dispatch), Some(reader_type_codes) => Ok(UnionResolution { ...
Both ... }},
(Some(dispatch), None) => Ok(UnionResolution { ... FromSingle ...}),
(None, Some(reader_type_codes) => Ok(UnionResolution { ... ToSingle ...
}},
(None, None) => Err(...),
}
```
##########
arrow-avro/src/reader/record.rs:
##########
@@ -83,6 +84,46 @@ macro_rules! append_decimal_default {
}};
}
+macro_rules! flush_union {
+ ($fields:expr, $type_ids:expr, $offsets:expr, $encodings:expr) => {{
+ let encoding_arrays = $encodings
+ .iter_mut()
+ .map(|d| d.flush(None))
+ .collect::<Result<Vec<_>, _>>()?;
+ let type_ids_buf: ScalarBuffer<i8> =
flush_values($type_ids).into_iter().collect();
+ let offsets_buf: ScalarBuffer<i32> =
flush_values($offsets).into_iter().collect();
+ let arr = UnionArray::try_new(
+ $fields.clone(),
+ type_ids_buf,
+ Some(offsets_buf),
+ encoding_arrays,
+ )
+ .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+ Arc::new(arr)
+ }};
+}
+
+macro_rules! get_writer_union_action {
+ ($buf:expr, $union_resolution:expr) => {{
+ let branch = $buf.get_long()?;
+ if branch < 0 {
+ return Err(ArrowError::ParseError(format!(
+ "Negative union branch index {branch}"
+ )));
+ }
+ let idx = branch as usize;
+ let dispatch = match $union_resolution.dispatch.as_deref() {
+ Some(d) => d,
+ None => {
+ return Err(ArrowError::SchemaError(
+ "dispatch table missing for writer=union".to_string(),
+ ));
+ }
+ };
Review Comment:
```suggestion
let Some(dispatch) = $union_resolution.dispatch.as_deref() else {
return Err(ArrowError::SchemaError(
"dispatch table missing for writer=union".to_string(),
));
};
```
##########
arrow-avro/src/reader/record.rs:
##########
@@ -259,12 +441,50 @@ enum Decoder {
Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
+ Union(
+ UnionFields,
+ Vec<i8>,
+ Vec<i32>,
+ Vec<Decoder>,
+ Vec<i32>,
+ Option<UnionResolution>,
+ ),
Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
}
impl Decoder {
fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
- // Extract just the Promotion (if any) to simplify pattern matching
+ if let Some(ResolutionInfo::Union(info)) =
data_type.resolution.as_ref() {
+ if info.writer_is_union && !info.reader_is_union {
+ let mut clone = data_type.clone();
+ clone.resolution = None;
+ let target = Box::new(Self::try_new_internal(&clone)?);
+ let mut union_resolution = UnionResolutionBuilder::new()
+ .with_resolved_union(info)
+ .build()?;
+ if let UnionResolvedKind::ToSingle { target: t } = &mut
union_resolution.kind {
+ *t = target;
+ }
+ let base = Self::Union(
+ UnionFields::empty(),
+ Vec::new(),
+ Vec::new(),
+ Vec::new(),
+ Vec::new(),
+ Some(union_resolution),
+ );
+ return Ok(match data_type.nullability() {
+ Some(n) => {
+ Self::Nullable(n,
NullBufferBuilder::new(DEFAULT_CAPACITY), Box::new(base))
+ }
+ None => base,
+ });
Review Comment:
With a `mut base` could do:
```suggestion
if let Some(n) = match data_type.nullability() {
base = Self::Nullable(n,
NullBufferBuilder::new(DEFAULT_CAPACITY), Box::new(base));
}
return Ok(base);
```
##########
arrow-avro/src/reader/record.rs:
##########
@@ -214,6 +254,148 @@ struct EnumResolution {
default_index: i32,
}
+#[derive(Debug, Clone, Copy)]
+enum BranchDispatch {
+ NoMatch,
+ ToReader {
+ reader_idx: usize,
+ promotion: Promotion,
+ },
+}
+
+#[derive(Debug)]
+struct UnionResolution {
+ dispatch: Option<Arc<[BranchDispatch]>>,
+ kind: UnionResolvedKind,
+}
+
+#[derive(Debug)]
+enum UnionResolvedKind {
+ Both {
+ reader_type_codes: Arc<[i8]>,
+ },
+ ToSingle {
+ target: Box<Decoder>,
+ },
+ FromSingle {
+ reader_type_codes: Arc<[i8]>,
+ target_reader_index: usize,
+ promotion: Promotion,
+ },
+}
+
+#[derive(Debug, Default)]
+struct UnionResolutionBuilder {
+ fields: Option<UnionFields>,
+ resolved: Option<ResolvedUnion>,
+}
+
+impl UnionResolutionBuilder {
+ #[inline]
+ fn new() -> Self {
+ Self {
+ fields: None,
+ resolved: None,
+ }
+ }
+
+ #[inline]
+ fn with_fields(mut self, fields: UnionFields) -> Self {
+ self.fields = Some(fields);
+ self
+ }
+
+ #[inline]
+ fn with_resolved_union(mut self, resolved_union: &ResolvedUnion) -> Self {
+ self.resolved = Some(resolved_union.clone());
+ self
+ }
+
+ fn build(self) -> Result<UnionResolution, ArrowError> {
+ let info = self.resolved.ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "UnionResolutionBuilder requires resolved_union to be
provided".to_string(),
+ )
+ })?;
+ match (info.writer_is_union, info.reader_is_union) {
+ (true, true) => {
+ let fields = self.fields.ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "UnionResolutionBuilder for reader union requires
fields".to_string(),
+ )
+ })?;
+ let reader_type_codes: Vec<i8> =
+ fields.iter().map(|(tid, _)| tid).collect::<Vec<_>>();
+ let dispatch: Vec<BranchDispatch> = info
+ .writer_to_reader
+ .iter()
+ .map(|m| match m {
+ Some((reader_index, promotion)) =>
BranchDispatch::ToReader {
+ reader_idx: *reader_index,
+ promotion: *promotion,
+ },
+ None => BranchDispatch::NoMatch,
+ })
+ .collect();
+ Ok(UnionResolution {
+ dispatch: Some(Arc::from(dispatch)),
+ kind: UnionResolvedKind::Both {
+ reader_type_codes: Arc::from(reader_type_codes),
+ },
+ })
+ }
+ (false, true) => {
+ let fields = self.fields.ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "UnionResolutionBuilder for reader union requires
fields".to_string(),
+ )
+ })?;
+ let reader_type_codes: Vec<i8> =
+ fields.iter().map(|(tid, _)| tid).collect::<Vec<_>>();
+ let (target_reader_index, promotion) =
+ match info.writer_to_reader.first().and_then(|x| *x) {
+ Some(pair) => pair,
+ None => {
+ return Err(ArrowError::SchemaError(
+ "Writer schema does not match any reader union
branch".to_string(),
Review Comment:
"any" ? The code only looks at "first" ?
##########
arrow-avro/src/reader/record.rs:
##########
@@ -214,6 +254,148 @@ struct EnumResolution {
default_index: i32,
}
+#[derive(Debug, Clone, Copy)]
+enum BranchDispatch {
+ NoMatch,
+ ToReader {
+ reader_idx: usize,
+ promotion: Promotion,
+ },
+}
+
+#[derive(Debug)]
+struct UnionResolution {
+ dispatch: Option<Arc<[BranchDispatch]>>,
+ kind: UnionResolvedKind,
+}
+
+#[derive(Debug)]
+enum UnionResolvedKind {
+ Both {
+ reader_type_codes: Arc<[i8]>,
+ },
+ ToSingle {
+ target: Box<Decoder>,
+ },
+ FromSingle {
+ reader_type_codes: Arc<[i8]>,
+ target_reader_index: usize,
+ promotion: Promotion,
+ },
+}
+
+#[derive(Debug, Default)]
+struct UnionResolutionBuilder {
+ fields: Option<UnionFields>,
+ resolved: Option<ResolvedUnion>,
+}
+
+impl UnionResolutionBuilder {
+ #[inline]
+ fn new() -> Self {
+ Self {
+ fields: None,
+ resolved: None,
+ }
+ }
+
+ #[inline]
+ fn with_fields(mut self, fields: UnionFields) -> Self {
+ self.fields = Some(fields);
+ self
+ }
+
+ #[inline]
+ fn with_resolved_union(mut self, resolved_union: &ResolvedUnion) -> Self {
+ self.resolved = Some(resolved_union.clone());
+ self
+ }
+
+ fn build(self) -> Result<UnionResolution, ArrowError> {
+ let info = self.resolved.ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "UnionResolutionBuilder requires resolved_union to be
provided".to_string(),
+ )
+ })?;
+ match (info.writer_is_union, info.reader_is_union) {
+ (true, true) => {
+ let fields = self.fields.ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "UnionResolutionBuilder for reader union requires
fields".to_string(),
+ )
+ })?;
+ let reader_type_codes: Vec<i8> =
+ fields.iter().map(|(tid, _)| tid).collect::<Vec<_>>();
+ let dispatch: Vec<BranchDispatch> = info
+ .writer_to_reader
+ .iter()
+ .map(|m| match m {
+ Some((reader_index, promotion)) =>
BranchDispatch::ToReader {
+ reader_idx: *reader_index,
+ promotion: *promotion,
+ },
+ None => BranchDispatch::NoMatch,
+ })
+ .collect();
+ Ok(UnionResolution {
+ dispatch: Some(Arc::from(dispatch)),
+ kind: UnionResolvedKind::Both {
+ reader_type_codes: Arc::from(reader_type_codes),
+ },
+ })
+ }
+ (false, true) => {
+ let fields = self.fields.ok_or_else(|| {
+ ArrowError::InvalidArgumentError(
+ "UnionResolutionBuilder for reader union requires
fields".to_string(),
+ )
+ })?;
+ let reader_type_codes: Vec<i8> =
+ fields.iter().map(|(tid, _)| tid).collect::<Vec<_>>();
+ let (target_reader_index, promotion) =
+ match info.writer_to_reader.first().and_then(|x| *x) {
+ Some(pair) => pair,
+ None => {
+ return Err(ArrowError::SchemaError(
Review Comment:
Would this be simpler?
```suggestion
let Some(Some((target_reader_index, promotion))) =
info.writer_to_reader.first() else {
return Err(ArrowError::SchemaError(
```
##########
arrow-avro/src/reader/record.rs:
##########
@@ -486,6 +734,70 @@ impl Decoder {
Self::Decimal256(_, _, _, builder) =>
builder.append_value(i256::ZERO),
Self::Enum(indices, _, _) => indices.push(0),
Self::Duration(builder) => builder.append_null(),
+ Self::Union(fields, type_ids, offsets, encodings, encoding_counts,
None) => {
+ let mut chosen = None;
+ for (i, ch) in encodings.iter().enumerate() {
+ if matches!(ch, Decoder::Null(_)) {
+ chosen = Some(i);
+ break;
+ }
+ }
+ let idx = chosen.unwrap_or(0);
Review Comment:
```suggestion
let idx = encodings
.iter()
.position(|ch| matches!(ch, Decoder::Null(_)))
.unwrap_or(0);
```
(but why is defaulting to 0 correct?)
##########
arrow-avro/src/reader/record.rs:
##########
@@ -834,8 +1205,88 @@ impl Decoder {
let nanos = (millis as i64) * 1_000_000;
builder.append_value(IntervalMonthDayNano::new(months as i32,
days as i32, nanos));
}
+ Self::Union(fields, type_ids, offsets, encodings, encoding_counts,
None) => {
+ let branch = buf.get_long()?;
+ if branch < 0 {
+ return Err(ArrowError::ParseError(format!(
+ "Negative union branch index {branch}"
+ )));
+ }
+ let idx = branch as usize;
+ if idx >= encodings.len() {
+ return Err(ArrowError::ParseError(format!(
+ "Union branch index {idx} out of range ({} branches)",
+ encodings.len()
+ )));
+ }
+ let type_id = fields
+ .iter()
+ .nth(idx)
+ .map(|(type_id, _)| type_id)
+ .unwrap_or_else(|| i8::try_from(idx).unwrap_or(0));
+ type_ids.push(type_id);
+ offsets.push(encoding_counts[idx]);
+ encodings[idx].decode(buf)?;
+ encoding_counts[idx] += 1;
Review Comment:
More redundancy with the previous block of code?
##########
arrow-avro/src/reader/record.rs:
##########
@@ -486,6 +734,70 @@ impl Decoder {
Self::Decimal256(_, _, _, builder) =>
builder.append_value(i256::ZERO),
Self::Enum(indices, _, _) => indices.push(0),
Self::Duration(builder) => builder.append_null(),
+ Self::Union(fields, type_ids, offsets, encodings, encoding_counts,
None) => {
+ let mut chosen = None;
+ for (i, ch) in encodings.iter().enumerate() {
+ if matches!(ch, Decoder::Null(_)) {
+ chosen = Some(i);
+ break;
+ }
+ }
+ let idx = chosen.unwrap_or(0);
+ let type_id = fields
+ .iter()
+ .nth(idx)
+ .map(|(type_id, _)| type_id)
+ .unwrap_or_else(|| i8::try_from(idx).unwrap_or(0));
+ type_ids.push(type_id);
+ offsets.push(encoding_counts[idx]);
+ encodings[idx].append_null();
+ encoding_counts[idx] += 1;
+ }
+ Self::Union(
+ fields,
+ type_ids,
+ offsets,
+ encodings,
+ encoding_counts,
+ Some(union_resolution),
+ ) => match &mut union_resolution.kind {
+ UnionResolvedKind::Both { .. } => {
+ let mut chosen = None;
+ for (i, ch) in encodings.iter().enumerate() {
+ if matches!(ch, Decoder::Null(_)) {
+ chosen = Some(i);
+ break;
+ }
+ }
+ let idx = chosen.unwrap_or(0);
+ let type_id = fields
+ .iter()
+ .nth(idx)
+ .map(|(type_id, _)| type_id)
+ .unwrap_or_else(|| i8::try_from(idx).unwrap_or(0));
Review Comment:
I don't understand why it's safe to interpret a field index as a type id?
AFAIK type ids are not required to be contiguous, nor ordered?
##########
arrow-avro/src/reader/record.rs:
##########
@@ -852,6 +1303,94 @@ impl Decoder {
Ok(())
}
+ fn decode_with_promotion(
+ &mut self,
+ buf: &mut AvroCursor<'_>,
+ promotion: Promotion,
+ ) -> Result<(), ArrowError> {
+ match promotion {
+ Promotion::Direct => self.decode(buf),
+ Promotion::IntToLong => match self {
+ Self::Int64(v) => {
+ v.push(buf.get_int()? as i64);
Review Comment:
Actually, a macro might really help here
```rust
Promotion::IntToLong => handle_promotion!(Int64, get_int),
Promotion::IntToFloat => handle_promotion!(Float32, get_int),
```
etc
(might need to either `impl Display for Promotion`, or pass the `"Int ->
Long"` type strings for error messages)
##########
arrow-avro/src/reader/record.rs:
##########
@@ -486,6 +734,70 @@ impl Decoder {
Self::Decimal256(_, _, _, builder) =>
builder.append_value(i256::ZERO),
Self::Enum(indices, _, _) => indices.push(0),
Self::Duration(builder) => builder.append_null(),
+ Self::Union(fields, type_ids, offsets, encodings, encoding_counts,
None) => {
+ let mut chosen = None;
+ for (i, ch) in encodings.iter().enumerate() {
+ if matches!(ch, Decoder::Null(_)) {
+ chosen = Some(i);
+ break;
+ }
+ }
+ let idx = chosen.unwrap_or(0);
+ let type_id = fields
+ .iter()
+ .nth(idx)
+ .map(|(type_id, _)| type_id)
+ .unwrap_or_else(|| i8::try_from(idx).unwrap_or(0));
Review Comment:
Actually, maybe I do understand!
* If NULL is one of the union branches, emit that directly
* Otherwise, default to the first branch (whatever that is), and emit a NULL
value for it
Is that correct?
(but if so, then I don't understand why `idx` could ever be out of bounds?)
##########
arrow-avro/src/reader/record.rs:
##########
@@ -852,6 +1303,94 @@ impl Decoder {
Ok(())
}
+ fn decode_with_promotion(
+ &mut self,
+ buf: &mut AvroCursor<'_>,
+ promotion: Promotion,
+ ) -> Result<(), ArrowError> {
+ match promotion {
+ Promotion::Direct => self.decode(buf),
+ Promotion::IntToLong => match self {
+ Self::Int64(v) => {
+ v.push(buf.get_int()? as i64);
+ Ok(())
+ }
+ _ => Err(ArrowError::ParseError(
+ "Promotion Int->Long target mismatch".into(),
+ )),
+ },
+ Promotion::IntToFloat => match self {
+ Self::Float32(v) => {
+ v.push(buf.get_int()? as f32);
+ Ok(())
+ }
+ _ => Err(ArrowError::ParseError(
+ "Promotion Int->Float target mismatch".into(),
+ )),
+ },
+ Promotion::IntToDouble => match self {
+ Self::Float64(v) => {
+ v.push(buf.get_int()? as f64);
+ Ok(())
+ }
+ _ => Err(ArrowError::ParseError(
+ "Promotion Int->Double target mismatch".into(),
+ )),
+ },
+ Promotion::LongToFloat => match self {
+ Self::Float32(v) => {
+ v.push(buf.get_long()? as f32);
+ Ok(())
+ }
+ _ => Err(ArrowError::ParseError(
+ "Promotion Long->Float target mismatch".into(),
+ )),
+ },
+ Promotion::LongToDouble => match self {
Review Comment:
Technically these are narrowing/converting casts because f32 and f64
respectively have only 24 and 53 bits of precision (same problem casting i32 to
f32).
Does the spec require treating this as a "promotion" even tho it's lossy?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]