Jefffrey commented on code in PR #7962:
URL: https://github.com/apache/arrow-datafusion/pull/7962#discussion_r1375316184
##########
datafusion/core/src/datasource/file_format/arrow.rs:
##########
@@ -99,7 +102,140 @@ impl FileFormat for ArrowFormat {
}
}
-fn read_arrow_schema_from_reader<R: Read + Seek>(reader: R) ->
Result<SchemaRef> {
- let reader = FileReader::try_new(reader, None)?;
- Ok(reader.schema())
+const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
+const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];
+
+async fn infer_schema_from_file_stream(
+ mut stream: BoxStream<'static, object_store::Result<Bytes>>,
+) -> Result<SchemaRef> {
+ // Expected format:
+ // <magic number "ARROW1"> - 6 bytes
+ // <empty padding bytes [to 8 byte boundary]> - 2 bytes
+ // <continutation: 0xFFFFFFFF> - 4 bytes, not present below v0.15.0
+ // <metadata_size: int32> - 4 bytes
+ // <metadata_flatbuffer: bytes>
+ // <rest of file bytes>
+
+ // So in first read we need at least all known sized sections,
+ // which is 6 + 2 + 4 + 4 = 16 bytes.
+ let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?;
+ if bytes.len() < 16 {
+ return Err(ArrowError::ParseError(
+ "Arrow IPC file stream shorter than expected".to_string(),
+ ))?;
+ }
+
+ // Files should start with these magic bytes
+ if bytes[0..6] != ARROW_MAGIC {
+ return Err(ArrowError::ParseError(
+ "Arrow file does not contian correct header".to_string(),
+ ))?;
+ }
+
+ // Since continuation marker bytes added in later versions
+ let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] ==
CONTINUATION_MARKER {
+ (&bytes[12..16], 16)
+ } else {
+ (&bytes[8..12], 12)
+ };
+
+ let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]];
+ let meta_len = i32::from_le_bytes(meta_len);
+
+ // Read bytes for Schema message
+ let block_data = if bytes[rest_of_bytes_start_index..].len() < meta_len as
usize {
+ // Need to read more bytes to decode Message
+ let mut block_data = Vec::with_capacity(meta_len as usize);
+ // In case we had some spare bytes in our initial read chunk
+ block_data.extend_from_slice(&bytes[rest_of_bytes_start_index..]);
+ let size_to_read = meta_len as usize - block_data.len();
+ let block_data =
+ collect_at_least_n_bytes(&mut stream, size_to_read,
Some(block_data)).await?;
Review Comment:
` collect_at_least_n_bytes()` should now do the length checking
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]