martin-g commented on code in PR #18457:
URL: https://github.com/apache/datafusion/pull/18457#discussion_r2509842910
##########
datafusion/datasource-arrow/src/file_format.rs:
##########
@@ -349,94 +376,122 @@ impl DataSink for ArrowFileSink {
}
}
+// Custom implementation of inferring schema. Should eventually be moved
upstream to arrow-rs.
+// See <https://github.com/apache/arrow-rs/issues/5021>
+
const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];
-/// Custom implementation of inferring schema. Should eventually be moved
upstream to arrow-rs.
-/// See <https://github.com/apache/arrow-rs/issues/5021>
-async fn infer_schema_from_file_stream(
+async fn infer_stream_schema(
mut stream: BoxStream<'static, object_store::Result<Bytes>>,
) -> Result<SchemaRef> {
- // Expected format:
- // <magic number "ARROW1"> - 6 bytes
- // <empty padding bytes [to 8 byte boundary]> - 2 bytes
- // <continuation: 0xFFFFFFFF> - 4 bytes, not present below v0.15.0
- // <metadata_size: int32> - 4 bytes
- // <metadata_flatbuffer: bytes>
- // <rest of file bytes>
-
- // So in first read we need at least all known sized sections,
- // which is 6 + 2 + 4 + 4 = 16 bytes.
- let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?;
-
- // Files should start with these magic bytes
- if bytes[0..6] != ARROW_MAGIC {
- return Err(ArrowError::ParseError(
- "Arrow file does not contain correct header".to_string(),
- ))?;
- }
-
- // Since continuation marker bytes added in later versions
- let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] ==
CONTINUATION_MARKER {
- (&bytes[12..16], 16)
+ // 16 bytes covers the preamble and metadata length no matter
+ // which version or format is used
+ let bytes = extend_bytes_to_n_length_from_stream(vec![], 16, &mut
stream).await?;
+
+ // The preamble length is everything before the metadata length
+ let preamble_len = if bytes[0..6] == ARROW_MAGIC {
+ // File format starts with magic number "ARROW1"
+ if bytes[8..12] == CONTINUATION_MARKER {
+ // Continuation marker was added in v0.15.0
+ 12
+ } else {
+ // File format before v0.15.0
+ 8
+ }
+ } else if bytes[0..4] == CONTINUATION_MARKER {
+ // Stream format after v0.15.0 starts with continuation marker
+ 4
} else {
- (&bytes[8..12], 12)
+ // Stream format before v0.15.0 does not have a preamble
+ 0
};
- let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]];
- let meta_len = i32::from_le_bytes(meta_len);
-
- // Read bytes for Schema message
- let block_data = if bytes[rest_of_bytes_start_index..].len() < meta_len as
usize {
- // Need to read more bytes to decode Message
- let mut block_data = Vec::with_capacity(meta_len as usize);
- // In case we had some spare bytes in our initial read chunk
- block_data.extend_from_slice(&bytes[rest_of_bytes_start_index..]);
- let size_to_read = meta_len as usize - block_data.len();
- let block_data =
- collect_at_least_n_bytes(&mut stream, size_to_read,
Some(block_data)).await?;
- Cow::Owned(block_data)
- } else {
- // Already have the bytes we need
- let end_index = meta_len as usize + rest_of_bytes_start_index;
- let block_data = &bytes[rest_of_bytes_start_index..end_index];
- Cow::Borrowed(block_data)
- };
+ let meta_len_bytes: [u8; 4] = bytes[preamble_len..preamble_len + 4]
+ .try_into()
+ .map_err(|err| {
+ ArrowError::ParseError(format!(
+ "Unable to read IPC message metadata length: {err:?}"
+ ))
+ })?;
+
+ let meta_len = i32::from_le_bytes([
+ meta_len_bytes[0],
+ meta_len_bytes[1],
+ meta_len_bytes[2],
+ meta_len_bytes[3],
+ ]);
+
+ if meta_len < 0 {
+ return Err(ArrowError::ParseError(
+ "IPC message metadata length is negative".to_string(),
+ )
+ .into());
+ }
+
+ let bytes = extend_bytes_to_n_length_from_stream(
+ bytes,
+ preamble_len + 4 + (meta_len as usize),
+ &mut stream,
+ )
+ .await?;
- // Decode Schema message
- let message = root_as_message(&block_data).map_err(|err| {
- ArrowError::ParseError(format!("Unable to read IPC message as
metadata: {err:?}"))
+ let message = root_as_message(&bytes[preamble_len + 4..]).map_err(|err| {
+ ArrowError::ParseError(format!("Unable to read IPC message metadata:
{err:?}"))
})?;
- let ipc_schema = message.header_as_schema().ok_or_else(|| {
- ArrowError::IpcError("Unable to read IPC message as
schema".to_string())
+ let fb_schema = message.header_as_schema().ok_or_else(|| {
+ ArrowError::IpcError("Unable to read IPC message schema".to_string())
})?;
- let schema = fb_to_schema(ipc_schema);
+ let schema = fb_to_schema(fb_schema);
Ok(Arc::new(schema))
}
-async fn collect_at_least_n_bytes(
- stream: &mut BoxStream<'static, object_store::Result<Bytes>>,
+async fn extend_bytes_to_n_length_from_stream(
+ bytes: Vec<u8>,
n: usize,
- extend_from: Option<Vec<u8>>,
+ stream: &mut BoxStream<'static, object_store::Result<Bytes>>,
) -> Result<Vec<u8>> {
- let mut buf = extend_from.unwrap_or_else(|| Vec::with_capacity(n));
- // If extending existing buffer then ensure we read n additional bytes
- let n = n + buf.len();
- while let Some(bytes) = stream.next().await.transpose()? {
- buf.extend_from_slice(&bytes);
+ if bytes.len() >= n {
+ return Ok(bytes);
+ }
+
+ let mut buf = bytes;
+
+ while let Some(b) = stream.next().await.transpose()? {
+ buf.extend_from_slice(&b);
+
if buf.len() >= n {
break;
}
}
+
if buf.len() < n {
return Err(ArrowError::ParseError(
"Unexpected end of byte stream for Arrow IPC file".to_string(),
- ))?;
+ )
+ .into());
}
+
Ok(buf)
}
+async fn is_object_in_arrow_ipc_file_format(
+ store: Arc<dyn ObjectStore>,
+ object_location: &Path,
+) -> Result<bool> {
+ let get_opts = GetOptions {
+ range: Some(GetRange::Bounded(0..6)),
+ ..Default::default()
+ };
+ let bytes = store
+ .get_opts(object_location, get_opts)
Review Comment:
Does it return an error here if the file is less than 6 bytes ?
According to
https://docs.rs/object_store/latest/object_store/struct.GetOptions.html#structfield.range
it returns
https://docs.rs/object_store/latest/object_store/enum.Error.html#variant.NotModified
If this error is indeed returned then the check below `bytes.len() >= 6` is
not really needed. It actually confuses the maintainer that it is possible
`bytes` to be less than 6 bytes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]