alamb commented on code in PR #8413:
URL: https://github.com/apache/arrow-datafusion/pull/8413#discussion_r1418071917
##########
datafusion-cli/src/functions.rs:
##########
@@ -196,3 +209,197 @@ pub fn display_all_functions() -> Result<()> {
println!("{}", pretty_format_batches(&[batch]).unwrap());
Ok(())
}
+
+/// PARQUET_META table function
+struct ParquetMetadataTable {
+ schema: SchemaRef,
+ batch: RecordBatch,
+}
+
+#[async_trait]
+impl TableProvider for ParquetMetadataTable {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn schema(&self) -> arrow::datatypes::SchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> datafusion::logical_expr::TableType {
+ datafusion::logical_expr::TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ _state: &SessionState,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(MemoryExec::try_new(
+ &[vec![self.batch.clone()]],
+ TableProvider::schema(self),
+ projection.cloned(),
+ )?))
+ }
+}
+
+pub struct ParquetMetadataFunc {}
+
+impl TableFunctionImpl for ParquetMetadataFunc {
+ fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
+ let Some(Expr::Column(Column { name, .. })) = exprs.get(0) else {
+ return plan_err!("parquet_metadata requires string argument as its
input");
+ };
+
+ let file = File::open(name.clone())?;
+ let reader = SerializedFileReader::new(file)?;
+ let metadata = reader.metadata();
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("filename", DataType::Utf8, true),
+ Field::new("row_group_id", DataType::Int64, true),
+ Field::new("row_group_num_rows", DataType::Int64, true),
+ Field::new("row_group_num_columns", DataType::Int64, true),
+ Field::new("row_group_bytes", DataType::Int64, true),
+ Field::new("column_id", DataType::Int64, true),
+ Field::new("file_offset", DataType::Int64, true),
+ Field::new("num_values", DataType::Int64, true),
+ Field::new("path_in_schema", DataType::Utf8, true),
+ Field::new("type", DataType::Utf8, true),
+ Field::new("stats_min", DataType::Utf8, true),
+ Field::new("stats_max", DataType::Utf8, true),
+ Field::new("stats_null_count", DataType::Int64, true),
+ Field::new("stats_distinct_count", DataType::Int64, true),
+ Field::new("stats_min_value", DataType::Utf8, true),
+ Field::new("stats_max_value", DataType::Utf8, true),
+ Field::new("compression", DataType::Utf8, true),
+ Field::new("encodings", DataType::Utf8, true),
+ Field::new("index_page_offset", DataType::Int64, true),
+ Field::new("dictionary_page_offset", DataType::Int64, true),
+ Field::new("data_page_offset", DataType::Int64, true),
+ Field::new("total_compressed_size", DataType::Int64, true),
+ Field::new("total_uncompressed_size", DataType::Int64, true),
+ ]));
+
+ // construct recordbatch from metadata
+ let mut filename_arr = vec![];
+ let mut row_group_id_arr = vec![];
+ let mut row_group_num_rows_arr = vec![];
+ let mut row_group_num_columns_arr = vec![];
+ let mut row_group_bytes_arr = vec![];
+ let mut column_id_arr = vec![];
+ let mut file_offset_arr = vec![];
+ let mut num_values_arr = vec![];
+ let mut path_in_schema_arr = vec![];
+ let mut type_arr = vec![];
+ let mut stats_min_arr = vec![];
+ let mut stats_max_arr = vec![];
+ let mut stats_null_count_arr = vec![];
+ let mut stats_distinct_count_arr = vec![];
+ let mut stats_min_value_arr = vec![];
+ let mut stats_max_value_arr = vec![];
+ let mut compression_arr = vec![];
+ let mut encodings_arr = vec![];
+ let mut index_page_offset_arr = vec![];
+ let mut dictionary_page_offset_arr = vec![];
+ let mut data_page_offset_arr = vec![];
+ let mut total_compressed_size_arr = vec![];
+ let mut total_uncompressed_size_arr = vec![];
+ for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
+ for (col_idx, column) in row_group.columns().iter().enumerate() {
+ filename_arr.push(name.clone());
+ row_group_id_arr.push(rg_idx as i64);
+ row_group_num_rows_arr.push(row_group.num_rows());
+ row_group_num_columns_arr.push(row_group.num_columns() as i64);
+ row_group_bytes_arr.push(row_group.total_byte_size());
+ column_id_arr.push(col_idx as i64);
+ file_offset_arr.push(column.file_offset());
+ num_values_arr.push(column.num_values());
+ path_in_schema_arr.push(column.column_path().to_string());
+ type_arr.push(column.column_type().to_string());
+ if let Some(s) = column.statistics() {
Review Comment:
For some reason, I think you also have to check `has_min_max_set` before
calling min/max
https://docs.rs/parquet/latest/parquet/file/statistics/enum.Statistics.html#method.has_min_max_set
(this is what is causing the panic I think)
##########
datafusion-cli/src/main.rs:
##########
@@ -185,6 +186,8 @@ pub async fn main() -> Result<()> {
ctx.state().catalog_list(),
ctx.state_weak_ref(),
)));
+ // register `parquet_metadata` table function to get metadata from parquet
files
+ ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));
Review Comment:
🥳
##########
datafusion-cli/src/functions.rs:
##########
@@ -196,3 +209,197 @@ pub fn display_all_functions() -> Result<()> {
println!("{}", pretty_format_batches(&[batch]).unwrap());
Ok(())
}
+
+/// PARQUET_META table function
+struct ParquetMetadataTable {
+ schema: SchemaRef,
+ batch: RecordBatch,
+}
+
+#[async_trait]
+impl TableProvider for ParquetMetadataTable {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn schema(&self) -> arrow::datatypes::SchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> datafusion::logical_expr::TableType {
+ datafusion::logical_expr::TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ _state: &SessionState,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(MemoryExec::try_new(
+ &[vec![self.batch.clone()]],
+ TableProvider::schema(self),
+ projection.cloned(),
+ )?))
+ }
+}
+
+pub struct ParquetMetadataFunc {}
+
+impl TableFunctionImpl for ParquetMetadataFunc {
+ fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
+ let Some(Expr::Column(Column { name, .. })) = exprs.get(0) else {
Review Comment:
If this also handled `ScalarValue::String` here it would let the
`read_metadata('file')` (single quotes `'` work.
##########
datafusion-cli/src/main.rs:
##########
@@ -185,6 +186,8 @@ pub async fn main() -> Result<()> {
ctx.state().catalog_list(),
ctx.state_weak_ref(),
)));
+ // register `parquet_metadata` table function to get metadata from parquet
files
+ ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));
Review Comment:
🥳
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]