roeap commented on code in PR #4296: URL: https://github.com/apache/arrow-rs/pull/4296#discussion_r1212269660
########## arrow-flight/src/sql/catalogs/tables.rs: ########## @@ -0,0 +1,466 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`GetTablesBuilder`] for building responses to [`CommandGetTables`] queries. +//! +//! [`CommandGetTables`]: crate::sql::CommandGetTables + +use std::sync::Arc; + +use arrow_arith::boolean::{and, or}; +use arrow_array::builder::{BinaryBuilder, StringBuilder}; +use arrow_array::{ArrayRef, RecordBatch}; +use arrow_ord::comparison::eq_utf8_scalar; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use arrow_select::{filter::filter_record_batch, take::take}; +use arrow_string::like::like_utf8_scalar; +use once_cell::sync::Lazy; + +use super::lexsort_to_indices; +use crate::error::*; +use crate::sql::CommandGetTables; +use crate::{IpcMessage, IpcWriteOptions, SchemaAsIpc}; + +/// Return the schema of the RecordBatch that will be returned from [`CommandGetTables`] +/// +/// Note the schema differs based on the values of `include_schema +/// +/// [`CommandGetTables`]: crate::sql::CommandGetTables +pub fn get_tables_schema(include_schema: bool) -> SchemaRef { + if include_schema { + Arc::clone(&GET_TABLES_SCHEMA_WITH_TABLE_SCHEMA) + } else { + Arc::clone(&GET_TABLES_SCHEMA_WITHOUT_TABLE_SCHEMA) + } +} + +/// Builds rows like this: +/// +/// * catalog_name: utf8, +/// * db_schema_name: utf8, +/// * table_name: utf8 not null, +/// * table_type: utf8 not null, +/// * (optional) table_schema: bytes not null (schema of the table as described +/// in Schema.fbs::Schema it is serialized as an IPC message.) +pub struct GetTablesBuilder { + catalog_filter: Option<String>, + table_types_filter: Vec<String>, + // Optional filters to apply to schemas + db_schema_filter_pattern: Option<String>, + // Optional filters to apply to tables + table_name_filter_pattern: Option<String>, + // array builder for catalog names + catalog_name: StringBuilder, + // array builder for db schema names + db_schema_name: StringBuilder, + // array builder for tables names + table_name: StringBuilder, + // array builder for table types + table_type: StringBuilder, + // array builder for table schemas + table_schema: Option<BinaryBuilder>, +} + +impl CommandGetTables { + pub fn into_builder(self) -> GetTablesBuilder { + self.into() + } +} + +impl From<CommandGetTables> for GetTablesBuilder { + fn from(value: CommandGetTables) -> Self { + Self::new( + value.catalog, + value.db_schema_filter_pattern, + value.table_name_filter_pattern, + value.table_types, + value.include_schema, + ) + } +} + +impl GetTablesBuilder { + /// Create a new instance of [`GetTablesBuilder`] + /// + /// # Paramneters + /// + /// - `catalog`: Specifies the Catalog to search for the tables. + /// - An empty string retrieves those without a catalog. + /// - If omitted the catalog name is not used to narrow the search. + /// - `db_schema_filter_pattern`: Specifies a filter pattern for schemas to search for. + /// When no pattern is provided, the pattern will not be used to narrow the search. + /// In the pattern string, two special characters can be used to denote matching rules: + /// - "%" means to match any substring with 0 or more characters. + /// - "_" means to match any one character. + /// - `table_name_filter_pattern`: Specifies a filter pattern for tables to search for. + /// When no pattern is provided, all tables matching other filters are searched. + /// In the pattern string, two special characters can be used to denote matching rules: + /// - "%" means to match any substring with 0 or more characters. + /// - "_" means to match any one character. + /// - `table_types`: Specifies a filter of table types which must match. + /// An empy Vec matches all table types. + /// - `include_schema`: Specifies if the Arrow schema should be returned for found tables. + /// + /// [`CommandGetTables`]: crate::sql::CommandGetTables + pub fn new( + catalog: Option<impl Into<String>>, + db_schema_filter_pattern: Option<impl Into<String>>, + table_name_filter_pattern: Option<impl Into<String>>, + table_types: impl IntoIterator<Item = impl Into<String>>, + include_schema: bool, + ) -> Self { + let table_schema = if include_schema { + Some(BinaryBuilder::new()) + } else { + None + }; + Self { + catalog_filter: catalog.map(|s| s.into()), + table_types_filter: table_types.into_iter().map(|tt| tt.into()).collect(), + db_schema_filter_pattern: db_schema_filter_pattern.map(|s| s.into()), + table_name_filter_pattern: table_name_filter_pattern.map(|t| t.into()), + catalog_name: StringBuilder::new(), + db_schema_name: StringBuilder::new(), + table_name: StringBuilder::new(), + table_type: StringBuilder::new(), + table_schema, + } + } + + /// Append a row + pub fn append( + &mut self, + catalog_name: impl AsRef<str>, Review Comment: I was not completely sure how this would be expected. The filter distinguishes `None` which means no filtering vs `""` which means empty catalog. So in order to specify an empty catalog, one would pass in "". Thought about making this optional, and then testing for nulls in the filter, but in the end I felt its trivial to default to empty string for the caller, so handling nulls in the catalog is not necessary. ########## arrow-flight/src/sql/catalogs/tables.rs: ########## @@ -0,0 +1,466 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`GetTablesBuilder`] for building responses to [`CommandGetTables`] queries. +//! +//! [`CommandGetTables`]: crate::sql::CommandGetTables + +use std::sync::Arc; + +use arrow_arith::boolean::{and, or}; +use arrow_array::builder::{BinaryBuilder, StringBuilder}; +use arrow_array::{ArrayRef, RecordBatch}; +use arrow_ord::comparison::eq_utf8_scalar; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use arrow_select::{filter::filter_record_batch, take::take}; +use arrow_string::like::like_utf8_scalar; +use once_cell::sync::Lazy; + +use super::lexsort_to_indices; +use crate::error::*; +use crate::sql::CommandGetTables; +use crate::{IpcMessage, IpcWriteOptions, SchemaAsIpc}; + +/// Return the schema of the RecordBatch that will be returned from [`CommandGetTables`] +/// +/// Note the schema differs based on the values of `include_schema +/// +/// [`CommandGetTables`]: crate::sql::CommandGetTables +pub fn get_tables_schema(include_schema: bool) -> SchemaRef { + if include_schema { + Arc::clone(&GET_TABLES_SCHEMA_WITH_TABLE_SCHEMA) + } else { + Arc::clone(&GET_TABLES_SCHEMA_WITHOUT_TABLE_SCHEMA) + } +} + +/// Builds rows like this: +/// +/// * catalog_name: utf8, +/// * db_schema_name: utf8, +/// * table_name: utf8 not null, +/// * table_type: utf8 not null, +/// * (optional) table_schema: bytes not null (schema of the table as described +/// in Schema.fbs::Schema it is serialized as an IPC message.) +pub struct GetTablesBuilder { + catalog_filter: Option<String>, + table_types_filter: Vec<String>, + // Optional filters to apply to schemas + db_schema_filter_pattern: Option<String>, + // Optional filters to apply to tables + table_name_filter_pattern: Option<String>, + // array builder for catalog names + catalog_name: StringBuilder, + // array builder for db schema names + db_schema_name: StringBuilder, + // array builder for tables names + table_name: StringBuilder, + // array builder for table types + table_type: StringBuilder, + // array builder for table schemas + table_schema: Option<BinaryBuilder>, +} + +impl CommandGetTables { + pub fn into_builder(self) -> GetTablesBuilder { + self.into() + } +} + +impl From<CommandGetTables> for GetTablesBuilder { + fn from(value: CommandGetTables) -> Self { + Self::new( + value.catalog, + value.db_schema_filter_pattern, + value.table_name_filter_pattern, + value.table_types, + value.include_schema, + ) + } +} + +impl GetTablesBuilder { + /// Create a new instance of [`GetTablesBuilder`] + /// + /// # Paramneters + /// + /// - `catalog`: Specifies the Catalog to search for the tables. + /// - An empty string retrieves those without a catalog. + /// - If omitted the catalog name is not used to narrow the search. + /// - `db_schema_filter_pattern`: Specifies a filter pattern for schemas to search for. + /// When no pattern is provided, the pattern will not be used to narrow the search. + /// In the pattern string, two special characters can be used to denote matching rules: + /// - "%" means to match any substring with 0 or more characters. + /// - "_" means to match any one character. + /// - `table_name_filter_pattern`: Specifies a filter pattern for tables to search for. + /// When no pattern is provided, all tables matching other filters are searched. + /// In the pattern string, two special characters can be used to denote matching rules: + /// - "%" means to match any substring with 0 or more characters. + /// - "_" means to match any one character. + /// - `table_types`: Specifies a filter of table types which must match. + /// An empy Vec matches all table types. + /// - `include_schema`: Specifies if the Arrow schema should be returned for found tables. + /// + /// [`CommandGetTables`]: crate::sql::CommandGetTables + pub fn new( + catalog: Option<impl Into<String>>, + db_schema_filter_pattern: Option<impl Into<String>>, + table_name_filter_pattern: Option<impl Into<String>>, + table_types: impl IntoIterator<Item = impl Into<String>>, + include_schema: bool, + ) -> Self { + let table_schema = if include_schema { + Some(BinaryBuilder::new()) + } else { + None + }; + Self { + catalog_filter: catalog.map(|s| s.into()), + table_types_filter: table_types.into_iter().map(|tt| tt.into()).collect(), + db_schema_filter_pattern: db_schema_filter_pattern.map(|s| s.into()), + table_name_filter_pattern: table_name_filter_pattern.map(|t| t.into()), + catalog_name: StringBuilder::new(), + db_schema_name: StringBuilder::new(), + table_name: StringBuilder::new(), + table_type: StringBuilder::new(), + table_schema, + } + } + + /// Append a row + pub fn append( + &mut self, + catalog_name: impl AsRef<str>, + schema_name: impl AsRef<str>, + table_name: impl AsRef<str>, + table_type: impl AsRef<str>, + table_schema: &Schema, + ) -> Result<()> { + self.catalog_name.append_value(catalog_name); + self.db_schema_name.append_value(schema_name); + self.table_name.append_value(table_name); + self.table_type.append_value(table_type); + if let Some(self_table_schema) = self.table_schema.as_mut() { + let options = IpcWriteOptions::default(); + // encode the schema into the correct form + let message: std::result::Result<IpcMessage, _> = + SchemaAsIpc::new(table_schema, &options).try_into(); + let IpcMessage(schema) = message?; + self_table_schema.append_value(schema); + } + + Ok(()) + } + + /// builds a `RecordBatch` for `CommandGetTables` + pub fn build(self) -> Result<RecordBatch> { + let Self { + catalog_filter, + table_types_filter, + db_schema_filter_pattern, + table_name_filter_pattern, + + mut catalog_name, + mut db_schema_name, + mut table_name, + mut table_type, + table_schema, + } = self; + + // Make the arrays + let catalog_name = catalog_name.finish(); + let db_schema_name = db_schema_name.finish(); + let table_name = table_name.finish(); + let table_type = table_type.finish(); + let table_schema = table_schema.map(|mut table_schema| table_schema.finish()); + + // apply any filters, getting a BooleanArray that represents + // the rows that passed the filter + let mut filters = vec![]; + + if let Some(catalog_filter_name) = catalog_filter { + filters.push(eq_utf8_scalar(&catalog_name, &catalog_filter_name)?); + } + + let tt_filter = table_types_filter + .into_iter() + .map(|tt| eq_utf8_scalar(&table_type, &tt)) + .collect::<std::result::Result<Vec<_>, _>>()? + .into_iter() + // We know the arrays are of same length as they are produced fromn the same root array + .reduce(|filter, arr| or(&filter, &arr).unwrap()); Review Comment: `or` should only fail on misaligned arrays, which we should never see here. ########## arrow-flight/src/sql/catalogs/mod.rs: ########## @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Builders and function for building responses to infromation schema requests +//! +//! - [`get_catalogs_batch`] and [`get_catalogs_schema`] for building responses to [`CommandGetCatalogs`] queries. +//! - [`GetSchemasBuilder`] and [`get_db_schemas_schema`] for building responses to [`CommandGetDbSchemas`] queries. +//! - [`GetTablesBuilder`] and [`get_tables_schema`] for building responses to [`CommandGetTables`] queries. +//! +//! [`CommandGetCatalogs`]: crate::sql::CommandGetCatalogs +//! [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas +//! [`CommandGetTables`]: crate::sql::CommandGetTables + +use std::sync::Arc; + +use arrow_array::{ArrayRef, RecordBatch, StringArray, UInt32Array}; +use arrow_row::{RowConverter, SortField}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use once_cell::sync::Lazy; + +use crate::error::Result; + +pub use db_schemas::{get_db_schemas_schema, GetSchemasBuilder}; +pub use tables::{get_tables_schema, GetTablesBuilder}; + +mod db_schemas; +mod tables; + +/// Returns the RecordBatch for +pub fn get_catalogs_batch(mut catalog_names: Vec<String>) -> Result<RecordBatch> { Review Comment: Did a quick builder implementation - consistency is always nice ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
