Fokko commented on code in PR #7873:
URL: https://github.com/apache/iceberg/pull/7873#discussion_r1251903156
##########
python/pyiceberg/avro/file.py:
##########
@@ -204,3 +214,58 @@ def __next__(self) -> D:
def _read_header(self) -> AvroFileHeader:
return construct_reader(META_SCHEMA, {-1:
AvroFileHeader}).read(self.decoder)
+
+
+class AvroOutputFile(Generic[D]):
+ output_file: OutputFile
+ output_stream: OutputStream
+ schema: Schema
+ schema_name: str
+ encoder: BinaryEncoder
+ sync_bytes: bytes
+ writer: Writer
+
+ def __init__(self, output_file: OutputFile, schema: Schema, schema_name:
str) -> None:
+ self.output_file = output_file
+ self.schema = schema
+ self.schema_name = schema_name
+ self.sync_bytes = os.urandom(SYNC_SIZE)
+ self.writer = construct_writer(self.schema)
+
+ def __enter__(self) -> AvroOutputFile[D]:
+ """
+ Opens the file and writes the header.
+
+ Returns:
+ The file object to write records to
+ """
+ self.output_stream = self.output_file.create(overwrite=True)
+ self.encoder = BinaryEncoder(self.output_stream)
+
+ self._write_header()
+ self.writer = construct_writer(self.schema)
+
+ return self
+
+ def __exit__(
+ self, exctype: Optional[Type[BaseException]], excinst:
Optional[BaseException], exctb: Optional[TracebackType]
+ ) -> None:
+ """Performs cleanup when exiting the scope of a 'with' statement."""
+ self.output_stream.close()
+
+ def _write_header(self) -> None:
+ json_schema =
json.dumps(AvroSchemaConversion().iceberg_to_avro(self.schema,
schema_name=self.schema_name))
+ header = AvroFileHeader(magic=MAGIC, meta={_SCHEMA_KEY: json_schema,
_CODEC_KEY: "null"}, sync=self.sync_bytes)
Review Comment:
Great catch @JonasJ-ap. I don't think they are part of the official
specification but would make it easier to debug when we run into anything.
For the manifest list:
```
avro-tools getmeta
snap-2072804327949133930-1-ed74160b-4ebc-4d3d-b2a4-67d0336d4b52.avro
avro.schema
{"type":"record","name":"manifest_file","fields":[{"name":"manifest_path","type":"string","doc":"Location
URI with FS
scheme","field-id":500},{"name":"manifest_length","type":"long","doc":"Total
file size in
bytes","field-id":501},{"name":"partition_spec_id","type":"int","doc":"Spec ID
used to
write","field-id":502},{"name":"added_snapshot_id","type":["null","long"],"doc":"Snapshot
ID that added the
manifest","default":null,"field-id":503},{"name":"added_data_files_count","type":["null","int"],"doc":"Added
entry
count","default":null,"field-id":504},{"name":"existing_data_files_count","type":["null","int"],"doc":"Existing
entry
count","default":null,"field-id":505},{"name":"deleted_data_files_count","type":["null","int"],"doc":"Deleted
entry
count","default":null,"field-id":506},{"name":"partitions","type":["null",{"type":"array","items":{"type":"record","name":"r508","fields":[{"name":"contains_null","type":"boolean","doc":"True
if any file has a null partition value"
,"field-id":509},{"name":"contains_nan","type":["null","boolean"],"doc":"True
if any file has a nan partition
value","default":null,"field-id":518},{"name":"lower_bound","type":["null","bytes"],"doc":"Partition
lower bound for all
files","default":null,"field-id":510},{"name":"upper_bound","type":["null","bytes"],"doc":"Partition
upper bound for all
files","default":null,"field-id":511}]},"element-id":508}],"doc":"Summary for
each
partition","default":null,"field-id":507},{"name":"added_rows_count","type":["null","long"],"doc":"Added
rows
count","default":null,"field-id":512},{"name":"existing_rows_count","type":["null","long"],"doc":"Existing
rows
count","default":null,"field-id":513},{"name":"deleted_rows_count","type":["null","long"],"doc":"Deleted
rows count","default":null,"field-id":514}]}
avro.codec deflate
snapshot-id 2072804327949133930
format-version 1
iceberg.schema
{"type":"struct","schema-id":0,"fields":[{"id":500,"name":"manifest_path","required":true,"type":"string","doc":"Location
URI with FS
scheme"},{"id":501,"name":"manifest_length","required":true,"type":"long","doc":"Total
file size in
bytes"},{"id":502,"name":"partition_spec_id","required":true,"type":"int","doc":"Spec
ID used to
write"},{"id":503,"name":"added_snapshot_id","required":false,"type":"long","doc":"Snapshot
ID that added the
manifest"},{"id":504,"name":"added_data_files_count","required":false,"type":"int","doc":"Added
entry
count"},{"id":505,"name":"existing_data_files_count","required":false,"type":"int","doc":"Existing
entry
count"},{"id":506,"name":"deleted_data_files_count","required":false,"type":"int","doc":"Deleted
entry
count"},{"id":507,"name":"partitions","required":false,"type":{"type":"list","element-id":508,"element":{"type":"struct","fields":[{"id":509,"name":"contains_null","required":true,"type":"boolean","doc":"True
if any file has a nu
ll partition
value"},{"id":518,"name":"contains_nan","required":false,"type":"boolean","doc":"True
if any file has a nan partition
value"},{"id":510,"name":"lower_bound","required":false,"type":"binary","doc":"Partition
lower bound for all
files"},{"id":511,"name":"upper_bound","required":false,"type":"binary","doc":"Partition
upper bound for all files"}]},"element-required":true},"doc":"Summary for each
partition"},{"id":512,"name":"added_rows_count","required":false,"type":"long","doc":"Added
rows
count"},{"id":513,"name":"existing_rows_count","required":false,"type":"long","doc":"Existing
rows
count"},{"id":514,"name":"deleted_rows_count","required":false,"type":"long","doc":"Deleted
rows count"}]}
parent-snapshot-id 8418559943328617687
```
And for the manifest itself:
```
avro-tools getmeta 6261db11-d596-429a-9474-f45ca0a37a21-m0.avro
23/07/04 13:31:07 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
schema
{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"VendorID","required":false,"type":"long"},{"id":2,"name":"tpep_pickup_datetime","required":false,"type":"timestamptz"},{"id":3,"name":"tpep_dropoff_datetime","required":false,"type":"timestamptz"},{"id":4,"name":"passenger_count","required":false,"type":"double"},{"id":5,"name":"trip_distance","required":false,"type":"double"},{"id":6,"name":"RatecodeID","required":false,"type":"double"},{"id":7,"name":"store_and_fwd_flag","required":false,"type":"string"},{"id":8,"name":"PULocationID","required":false,"type":"long"},{"id":9,"name":"DOLocationID","required":false,"type":"long"},{"id":10,"name":"payment_type","required":false,"type":"long"},{"id":11,"name":"fare_amount","required":false,"type":"double"},{"id":12,"name":"extra","required":false,"type":"double"},{"id":13,"name":"mta_tax","required":false,"type":"double"},{"id":14,"name":"tip_amount","required":false,"type":"double"},{"id":15,"name":"tolls_amount","requir
ed":false,"type":"double"},{"id":16,"name":"improvement_surcharge","required":false,"type":"double"},{"id":17,"name":"total_amount","required":false,"type":"double"},{"id":18,"name":"congestion_surcharge","required":false,"type":"double"},{"id":19,"name":"airport_fee","required":false,"type":"double"}]}
avro.schema
{"type":"record","name":"manifest_entry","fields":[{"name":"status","type":"int","field-id":0},{"name":"snapshot_id","type":["null","long"],"default":null,"field-id":1},{"name":"data_file","type":{"type":"record","name":"r2","fields":[{"name":"file_path","type":"string","doc":"Location
URI with FS
scheme","field-id":100},{"name":"file_format","type":"string","doc":"File
format name: avro, orc, or
parquet","field-id":101},{"name":"partition","type":{"type":"record","name":"r102","fields":[{"name":"tpep_pickup_datetime_day","type":["null",{"type":"int","logicalType":"date"}],"default":null,"field-id":1000}]},"field-id":102},{"name":"record_count","type":"long","doc":"Number
of records in the
file","field-id":103},{"name":"file_size_in_bytes","type":"long","doc":"Total
file size in
bytes","field-id":104},{"name":"block_size_in_bytes","type":"long","field-id":105},{"name":"column_sizes","type":["null",{"type":"array","items":{"type":"record","name":"k117_v118","fields":[{"
name":"key","type":"int","field-id":117},{"name":"value","type":"long","field-id":118}]},"logicalType":"map"}],"doc":"Map
of column id to total size on
disk","default":null,"field-id":108},{"name":"value_counts","type":["null",{"type":"array","items":{"type":"record","name":"k119_v120","fields":[{"name":"key","type":"int","field-id":119},{"name":"value","type":"long","field-id":120}]},"logicalType":"map"}],"doc":"Map
of column id to total count, including null and
NaN","default":null,"field-id":109},{"name":"null_value_counts","type":["null",{"type":"array","items":{"type":"record","name":"k121_v122","fields":[{"name":"key","type":"int","field-id":121},{"name":"value","type":"long","field-id":122}]},"logicalType":"map"}],"doc":"Map
of column id to null value
count","default":null,"field-id":110},{"name":"nan_value_counts","type":["null",{"type":"array","items":{"type":"record","name":"k138_v139","fields":[{"name":"key","type":"int","field-id":138},{"name":"value","type":"long","fiel
d-id":139}]},"logicalType":"map"}],"doc":"Map of column id to number of NaN
values in the
column","default":null,"field-id":137},{"name":"lower_bounds","type":["null",{"type":"array","items":{"type":"record","name":"k126_v127","fields":[{"name":"key","type":"int","field-id":126},{"name":"value","type":"bytes","field-id":127}]},"logicalType":"map"}],"doc":"Map
of column id to lower
bound","default":null,"field-id":125},{"name":"upper_bounds","type":["null",{"type":"array","items":{"type":"record","name":"k129_v130","fields":[{"name":"key","type":"int","field-id":129},{"name":"value","type":"bytes","field-id":130}]},"logicalType":"map"}],"doc":"Map
of column id to upper
bound","default":null,"field-id":128},{"name":"key_metadata","type":["null","bytes"],"doc":"Encryption
key metadata
blob","default":null,"field-id":131},{"name":"split_offsets","type":["null",{"type":"array","items":"long","element-id":133}],"doc":"Splittable
offsets","default":null,"field-id":132},{"name":"sort_order_
id","type":["null","int"],"doc":"Sort order
ID","default":null,"field-id":140}]},"field-id":2}]}
avro.codec deflate
format-version 1
partition-spec-id 0
iceberg.schema
{"type":"struct","schema-id":0,"fields":[{"id":0,"name":"status","required":true,"type":"int"},{"id":1,"name":"snapshot_id","required":false,"type":"long"},{"id":2,"name":"data_file","required":true,"type":{"type":"struct","fields":[{"id":100,"name":"file_path","required":true,"type":"string","doc":"Location
URI with FS
scheme"},{"id":101,"name":"file_format","required":true,"type":"string","doc":"File
format name: avro, orc, or
parquet"},{"id":102,"name":"partition","required":true,"type":{"type":"struct","fields":[{"id":1000,"name":"tpep_pickup_datetime_day","required":false,"type":"date"}]}},{"id":103,"name":"record_count","required":true,"type":"long","doc":"Number
of records in the
file"},{"id":104,"name":"file_size_in_bytes","required":true,"type":"long","doc":"Total
file size in
bytes"},{"id":105,"name":"block_size_in_bytes","required":true,"type":"long"},{"id":108,"name":"column_sizes","required":false,"type":{"type":"map","key-id":117,"key":"int","value-id"
:118,"value":"long","value-required":true},"doc":"Map of column id to total
size on
disk"},{"id":109,"name":"value_counts","required":false,"type":{"type":"map","key-id":119,"key":"int","value-id":120,"value":"long","value-required":true},"doc":"Map
of column id to total count, including null and
NaN"},{"id":110,"name":"null_value_counts","required":false,"type":{"type":"map","key-id":121,"key":"int","value-id":122,"value":"long","value-required":true},"doc":"Map
of column id to null value
count"},{"id":137,"name":"nan_value_counts","required":false,"type":{"type":"map","key-id":138,"key":"int","value-id":139,"value":"long","value-required":true},"doc":"Map
of column id to number of NaN values in the
column"},{"id":125,"name":"lower_bounds","required":false,"type":{"type":"map","key-id":126,"key":"int","value-id":127,"value":"binary","value-required":true},"doc":"Map
of column id to lower
bound"},{"id":128,"name":"upper_bounds","required":false,"type":{"type":"map","key-id":129,"key
":"int","value-id":130,"value":"binary","value-required":true},"doc":"Map of
column id to upper
bound"},{"id":131,"name":"key_metadata","required":false,"type":"binary","doc":"Encryption
key metadata
blob"},{"id":132,"name":"split_offsets","required":false,"type":{"type":"list","element-id":133,"element":"long","element-required":true},"doc":"Splittable
offsets"},{"id":140,"name":"sort_order_id","required":false,"type":"int","doc":"Sort
order ID"}]}}]}
partition-spec
[{"name":"tpep_pickup_datetime_day","transform":"day","source-id":2,"field-id":1000}]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]