alamb commented on code in PR #4296:
URL: https://github.com/apache/arrow-rs/pull/4296#discussion_r1210227128
##########
arrow-flight/Cargo.toml:
##########
@@ -27,13 +27,17 @@ repository = { workspace = true }
license = { workspace = true }
[dependencies]
+arrow-arith = { workspace = true, optional = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
# Cast is needed to work around https://github.com/apache/arrow-rs/issues/3389
arrow-cast = { workspace = true }
-arrow-data = { workspace = true }
+arrow-data = { workspace = true, optional = true }
Review Comment:
👍 nice cleanup
##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -396,23 +477,113 @@ impl FlightSqlService for FlightSqlServiceImpl {
_query: CommandGetCatalogs,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- Err(Status::unimplemented("do_get_catalogs not implemented"))
+ let catalog_names = TABLES
+ .iter()
+ .map(|full_name|
full_name.split('.').collect::<Vec<_>>()[0].to_string())
+ .collect::<HashSet<_>>();
+ let batch = get_catalogs_batch(catalog_names.into_iter().collect());
+ let stream = FlightDataEncoderBuilder::new()
+ .with_schema(Arc::new(get_catalogs_schema().clone()))
+ .build(futures::stream::once(async { batch }))
+ .map_err(Status::from);
+ Ok(Response::new(Box::pin(stream)))
}
async fn do_get_schemas(
&self,
- _query: CommandGetDbSchemas,
+ query: CommandGetDbSchemas,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- Err(Status::unimplemented("do_get_schemas not implemented"))
+ let schemas = TABLES
+ .iter()
+ .map(|full_name| {
+ let parts = full_name.split('.').collect::<Vec<_>>();
+ (parts[0].to_string(), parts[1].to_string())
+ })
+ .collect::<HashSet<_>>();
+
+ let mut builder =
GetSchemasBuilder::new(query.db_schema_filter_pattern);
+ if let Some(catalog) = query.catalog {
Review Comment:
Given that the equality matching on `catalog` should be the same for all
implementations, what do you think about putting it in the builder itself?
The same comment applies to the `GetTablesBuilder` too
We can do this as a follow on PR too (or never)
Something like:
```rust
let mut builder =
GetSchemasBuilder::new(query.db_schema_filter_pattern);
if let Some(catalog) = query.catalog {
builder = builder.with_catalog_filter(catalog)
}
for (catalog_name, schema_name) in schemas {
builder
.append(catalog_name, schema_name)
.map_err(Status::from)?;
}
```
##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -396,23 +477,113 @@ impl FlightSqlService for FlightSqlServiceImpl {
_query: CommandGetCatalogs,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- Err(Status::unimplemented("do_get_catalogs not implemented"))
+ let catalog_names = TABLES
+ .iter()
+ .map(|full_name|
full_name.split('.').collect::<Vec<_>>()[0].to_string())
+ .collect::<HashSet<_>>();
+ let batch = get_catalogs_batch(catalog_names.into_iter().collect());
+ let stream = FlightDataEncoderBuilder::new()
+ .with_schema(Arc::new(get_catalogs_schema().clone()))
+ .build(futures::stream::once(async { batch }))
+ .map_err(Status::from);
+ Ok(Response::new(Box::pin(stream)))
}
async fn do_get_schemas(
&self,
- _query: CommandGetDbSchemas,
+ query: CommandGetDbSchemas,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- Err(Status::unimplemented("do_get_schemas not implemented"))
+ let schemas = TABLES
+ .iter()
+ .map(|full_name| {
+ let parts = full_name.split('.').collect::<Vec<_>>();
+ (parts[0].to_string(), parts[1].to_string())
+ })
+ .collect::<HashSet<_>>();
+
+ let mut builder =
GetSchemasBuilder::new(query.db_schema_filter_pattern);
+ if let Some(catalog) = query.catalog {
+ for (catalog_name, schema_name) in schemas {
+ if catalog == catalog_name {
+ builder
+ .append(catalog_name, schema_name)
+ .map_err(Status::from)?;
+ }
+ }
+ } else {
+ for (catalog_name, schema_name) in schemas {
+ builder
+ .append(catalog_name, schema_name)
+ .map_err(Status::from)?;
+ }
+ };
+
+ let batch = builder.build();
+ let stream = FlightDataEncoderBuilder::new()
+ .with_schema(get_db_schemas_schema())
+ .build(futures::stream::once(async { batch }))
+ .map_err(Status::from);
+ Ok(Response::new(Box::pin(stream)))
}
async fn do_get_tables(
&self,
- _query: CommandGetTables,
+ query: CommandGetTables,
_request: Request<Ticket>,
) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
- Err(Status::unimplemented("do_get_tables not implemented"))
+ let tables = TABLES
Review Comment:
I wonder if it is worth a comment that this does not implement full SQL
style multipart identifier semantics to warn anyone who tries to use this code
as a starting point for implementing FlightSQL? Maybe that will be obvious to
anyone who uses this code, but we fought for quite a while to get consistent
identifier semantics in DataFusion (one needs to parse it via SQL)
##########
arrow-flight/src/sql/catalogs/db_schemas.rs:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`GetSchemasBuilder`] for building responses to [`CommandGetDbSchemas`]
queries.
+//!
+//! [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+
+use std::sync::Arc;
+
+use arrow_array::{builder::StringBuilder, ArrayRef, RecordBatch};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use arrow_select::{filter::filter_record_batch, take::take};
+use arrow_string::like::like_utf8_scalar;
+use once_cell::sync::Lazy;
+
+use super::lexsort_to_indices;
+use crate::error::*;
+
+/// Return the schema of the RecordBatch that will be returned from
[`CommandGetDbSchemas`]
+///
+/// [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+pub fn get_db_schemas_schema() -> SchemaRef {
+ Arc::clone(&GET_DB_SCHEMAS_SCHEMA)
+}
+
+/// The schema for GetDbSchemas
+static GET_DB_SCHEMAS_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("catalog_name", DataType::Utf8, false),
+ Field::new("db_schema_name", DataType::Utf8, false),
+ ]))
+});
+
+/// Builds rows like this:
+///
+/// * catalog_name: utf8,
+/// * db_schema_name: utf8,
+pub struct GetSchemasBuilder {
+ // Optional filters to apply
+ db_schema_filter_pattern: Option<String>,
+ // array builder for catalog names
+ catalog_name: StringBuilder,
+ // array builder for schema names
+ db_schema_name: StringBuilder,
+}
+
+impl GetSchemasBuilder {
+ /// Create a new instance of [`GetSchemasBuilder`]
+ ///
+ /// The builder handles filtering by schemapatterns, the caller
Review Comment:
```suggestion
/// The builder handles filtering by schema patterns, the caller
```
##########
arrow-flight/src/sql/catalogs/db_schemas.rs:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`GetSchemasBuilder`] for building responses to [`CommandGetDbSchemas`]
queries.
+//!
+//! [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+
+use std::sync::Arc;
+
+use arrow_array::{builder::StringBuilder, ArrayRef, RecordBatch};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use arrow_select::{filter::filter_record_batch, take::take};
+use arrow_string::like::like_utf8_scalar;
+use once_cell::sync::Lazy;
+
+use super::lexsort_to_indices;
+use crate::error::*;
+
+/// Return the schema of the RecordBatch that will be returned from
[`CommandGetDbSchemas`]
+///
+/// [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+pub fn get_db_schemas_schema() -> SchemaRef {
+ Arc::clone(&GET_DB_SCHEMAS_SCHEMA)
+}
+
+/// The schema for GetDbSchemas
+static GET_DB_SCHEMAS_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("catalog_name", DataType::Utf8, false),
+ Field::new("db_schema_name", DataType::Utf8, false),
+ ]))
+});
+
+/// Builds rows like this:
+///
+/// * catalog_name: utf8,
+/// * db_schema_name: utf8,
+pub struct GetSchemasBuilder {
+ // Optional filters to apply
+ db_schema_filter_pattern: Option<String>,
+ // array builder for catalog names
+ catalog_name: StringBuilder,
+ // array builder for schema names
+ db_schema_name: StringBuilder,
+}
+
+impl GetSchemasBuilder {
+ /// Create a new instance of [`GetSchemasBuilder`]
+ ///
+ /// The builder handles filtering by schemapatterns, the caller
+ /// is expected to only pass in tables that match the catalog
+ /// from the [`CommandGetDbSchemas`] request.
+ ///
+ /// # Parameters
+ ///
+ /// - `db_schema_filter_pattern`: Specifies a filter pattern for schemas
to search for.
+ /// When no pattern is provided, the pattern will not be used to narrow
the search.
+ /// In the pattern string, two special characters can be used to denote
matching rules:
+ /// - "%" means to match any substring with 0 or more characters.
+ /// - "_" means to match any one character.
+ ///
+ /// [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+ pub fn new(db_schema_filter_pattern: Option<impl Into<String>>) -> Self {
+ let catalog_name = StringBuilder::new();
+ let db_schema_name = StringBuilder::new();
+ Self {
+ db_schema_filter_pattern: db_schema_filter_pattern.map(|v|
v.into()),
+ catalog_name,
+ db_schema_name,
+ }
+ }
+
+ /// Append a row
+ pub fn append(
+ &mut self,
+ catalog_name: impl AsRef<str>,
+ schema_name: impl AsRef<str>,
+ ) -> Result<()> {
+ self.catalog_name.append_value(catalog_name);
+ self.db_schema_name.append_value(schema_name);
+ Ok(())
+ }
+
+ /// builds the correct schema
Review Comment:
```suggestion
/// builds a `RecordBatch` with the correct schema for a
`CommandGetDbSchemas` response
```
##########
arrow-flight/src/sql/catalogs/db_schemas.rs:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`GetSchemasBuilder`] for building responses to [`CommandGetDbSchemas`]
queries.
+//!
+//! [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+
+use std::sync::Arc;
+
+use arrow_array::{builder::StringBuilder, ArrayRef, RecordBatch};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use arrow_select::{filter::filter_record_batch, take::take};
+use arrow_string::like::like_utf8_scalar;
+use once_cell::sync::Lazy;
+
+use super::lexsort_to_indices;
+use crate::error::*;
+
+/// Return the schema of the RecordBatch that will be returned from
[`CommandGetDbSchemas`]
+///
+/// [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+pub fn get_db_schemas_schema() -> SchemaRef {
+ Arc::clone(&GET_DB_SCHEMAS_SCHEMA)
+}
+
+/// The schema for GetDbSchemas
+static GET_DB_SCHEMAS_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("catalog_name", DataType::Utf8, false),
+ Field::new("db_schema_name", DataType::Utf8, false),
+ ]))
+});
+
+/// Builds rows like this:
+///
+/// * catalog_name: utf8,
+/// * db_schema_name: utf8,
+pub struct GetSchemasBuilder {
+ // Optional filters to apply
+ db_schema_filter_pattern: Option<String>,
+ // array builder for catalog names
+ catalog_name: StringBuilder,
+ // array builder for schema names
+ db_schema_name: StringBuilder,
+}
+
+impl GetSchemasBuilder {
+ /// Create a new instance of [`GetSchemasBuilder`]
+ ///
+ /// The builder handles filtering by schemapatterns, the caller
+ /// is expected to only pass in tables that match the catalog
+ /// from the [`CommandGetDbSchemas`] request.
+ ///
+ /// # Parameters
+ ///
+ /// - `db_schema_filter_pattern`: Specifies a filter pattern for schemas
to search for.
+ /// When no pattern is provided, the pattern will not be used to narrow
the search.
+ /// In the pattern string, two special characters can be used to denote
matching rules:
+ /// - "%" means to match any substring with 0 or more characters.
+ /// - "_" means to match any one character.
+ ///
+ /// [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+ pub fn new(db_schema_filter_pattern: Option<impl Into<String>>) -> Self {
+ let catalog_name = StringBuilder::new();
+ let db_schema_name = StringBuilder::new();
+ Self {
+ db_schema_filter_pattern: db_schema_filter_pattern.map(|v|
v.into()),
+ catalog_name,
+ db_schema_name,
+ }
+ }
+
+ /// Append a row
+ pub fn append(
+ &mut self,
+ catalog_name: impl AsRef<str>,
+ schema_name: impl AsRef<str>,
+ ) -> Result<()> {
+ self.catalog_name.append_value(catalog_name);
+ self.db_schema_name.append_value(schema_name);
+ Ok(())
+ }
+
+ /// builds the correct schema
+ pub fn build(self) -> Result<RecordBatch> {
+ let Self {
+ db_schema_filter_pattern,
+ mut catalog_name,
+ mut db_schema_name,
+ } = self;
+
+ // Make the arrays
+ let catalog_name = catalog_name.finish();
+ let db_schema_name = db_schema_name.finish();
+
+ // the filter, if requested, getting a BooleanArray that represents
the rows that passed the filter
+ let filter = db_schema_filter_pattern
+ .map(|db_schema_filter_pattern| {
+ // use like kernel to get wildcard matching
+ like_utf8_scalar(&db_schema_name, &db_schema_filter_pattern)
+ })
+ .transpose()?;
+
+ let batch = RecordBatch::try_new(
+ get_db_schemas_schema(),
+ vec![
+ Arc::new(catalog_name) as ArrayRef,
+ Arc::new(db_schema_name) as ArrayRef,
+ ],
+ )?;
+
+ // Apply the filters if needed
+ let filtered_batch = if let Some(filter) = filter {
+ filter_record_batch(&batch, &filter)?
+ } else {
+ batch
+ };
+
+ // Order filtered results by catalog_name, then db_schema_name
Review Comment:
👍
##########
arrow-flight/src/sql/catalogs/mod.rs:
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Builders and function for building responses to infromation schema requests
Review Comment:
```suggestion
//! Builders and function for building responses to information schema
requests
```
##########
arrow-flight/src/sql/catalogs/mod.rs:
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Builders and function for building responses to infromation schema requests
+//!
+//! - [`get_catalogs_batch`] and [`get_catalogs_schema`] for building
responses to [`CommandGetCatalogs`] queries.
+//! - [`GetSchemasBuilder`] and [`get_db_schemas_schema`] for building
responses to [`CommandGetDbSchemas`] queries.
+//! - [`GetTablesBuilder`] and [`get_tables_schema`] for building responses to
[`CommandGetTables`] queries.
+//!
+//! [`CommandGetCatalogs`]: crate::sql::CommandGetCatalogs
+//! [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+//! [`CommandGetTables`]: crate::sql::CommandGetTables
+
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, RecordBatch, StringArray, UInt32Array};
+use arrow_row::{RowConverter, SortField};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use once_cell::sync::Lazy;
+
+use crate::error::Result;
+
+pub use db_schemas::{get_db_schemas_schema, GetSchemasBuilder};
+pub use tables::{get_tables_schema, GetTablesBuilder};
+
+mod db_schemas;
+mod tables;
+
+/// Returns the RecordBatch for
+pub fn get_catalogs_batch(mut catalog_names: Vec<String>) ->
Result<RecordBatch> {
Review Comment:
Creating a builder for this simply function is probably overkill, but it
might be nice to implement one so that it was consistent with
`GetSchemasBuilder` and `GetTablesBuilder`
Maybe also as a follow on PR
##########
arrow-flight/src/sql/catalogs/mod.rs:
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Builders and function for building responses to infromation schema requests
+//!
+//! - [`get_catalogs_batch`] and [`get_catalogs_schema`] for building
responses to [`CommandGetCatalogs`] queries.
+//! - [`GetSchemasBuilder`] and [`get_db_schemas_schema`] for building
responses to [`CommandGetDbSchemas`] queries.
+//! - [`GetTablesBuilder`] and [`get_tables_schema`] for building responses to
[`CommandGetTables`] queries.
+//!
+//! [`CommandGetCatalogs`]: crate::sql::CommandGetCatalogs
+//! [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+//! [`CommandGetTables`]: crate::sql::CommandGetTables
+
+use std::sync::Arc;
+
+use arrow_array::{ArrayRef, RecordBatch, StringArray, UInt32Array};
+use arrow_row::{RowConverter, SortField};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use once_cell::sync::Lazy;
+
+use crate::error::Result;
+
+pub use db_schemas::{get_db_schemas_schema, GetSchemasBuilder};
+pub use tables::{get_tables_schema, GetTablesBuilder};
+
+mod db_schemas;
+mod tables;
+
+/// Returns the RecordBatch for
Review Comment:
```suggestion
/// Returns the RecordBatch for `CommandGetCatalogs`
```
##########
arrow-flight/src/sql/catalogs/tables.rs:
##########
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`GetTablesBuilder`] for building responses to [`CommandGetTables`]
queries.
+//!
+//! [`CommandGetTables`]: crate::sql::CommandGetTables
+
+use std::sync::Arc;
+
+use arrow_arith::boolean::and;
+use arrow_array::builder::{BinaryBuilder, StringBuilder};
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use arrow_select::{filter::filter_record_batch, take::take};
+use arrow_string::like::like_utf8_scalar;
+use once_cell::sync::Lazy;
+
+use super::lexsort_to_indices;
+use crate::error::*;
+use crate::{IpcMessage, IpcWriteOptions, SchemaAsIpc};
+
+/// Return the schema of the RecordBatch that will be returned from
[`CommandGetTables`]
+///
+/// Note the schema differs based on the values of `include_schema
+///
+/// [`CommandGetTables`]: crate::sql::CommandGetTables
+pub fn get_tables_schema(include_schema: bool) -> SchemaRef {
+ if include_schema {
+ Arc::clone(&GET_TABLES_SCHEMA_WITH_TABLE_SCHEMA)
+ } else {
+ Arc::clone(&GET_TABLES_SCHEMA_WITHOUT_TABLE_SCHEMA)
+ }
+}
+
+/// Builds rows like this:
+///
+/// * catalog_name: utf8,
+/// * db_schema_name: utf8,
+/// * table_name: utf8 not null,
+/// * table_type: utf8 not null,
+/// * (optional) table_schema: bytes not null (schema of the table as described
+/// in Schema.fbs::Schema it is serialized as an IPC message.)
+pub struct GetTablesBuilder {
+ // Optional filters to apply to schemas
+ db_schema_filter_pattern: Option<String>,
+ // Optional filters to apply to tables
+ table_name_filter_pattern: Option<String>,
+ // array builder for catalog names
+ catalog_name: StringBuilder,
+ // array builder for db schema names
+ db_schema_name: StringBuilder,
+ // array builder for tables names
+ table_name: StringBuilder,
+ // array builder for table types
+ table_type: StringBuilder,
+ // array builder for table schemas
+ table_schema: Option<BinaryBuilder>,
+}
+
+impl GetTablesBuilder {
+ /// Create a new instance of [`GetTablesBuilder`]
+ ///
+ /// The builder handles filtering by schema and table patterns, the caller
+ /// is expected to only pass in tables that match the catalog and
table_type
+ /// from the [`CommandGetTables`] request.
+ ///
+ /// # Paramneters
+ ///
+ /// - `db_schema_filter_pattern`: Specifies a filter pattern for schemas
to search for.
+ /// When no pattern is provided, the pattern will not be used to narrow
the search.
+ /// In the pattern string, two special characters can be used to denote
matching rules:
+ /// - "%" means to match any substring with 0 or more characters.
+ /// - "_" means to match any one character.
+ /// - `table_name_filter_pattern`: Specifies a filter pattern for tables
to search for.
+ /// When no pattern is provided, all tables matching other filters are
searched.
+ /// In the pattern string, two special characters can be used to denote
matching rules:
+ /// - "%" means to match any substring with 0 or more characters.
+ /// - "_" means to match any one character.
+ /// - `include_schema`: Specifies if the Arrow schema should be returned
for found tables.
+ ///
+ /// [`CommandGetTables`]: crate::sql::CommandGetTables
+ pub fn new(
+ db_schema_filter_pattern: Option<impl Into<String>>,
+ table_name_filter_pattern: Option<impl Into<String>>,
+ include_schema: bool,
+ ) -> Self {
+ let catalog_name = StringBuilder::new();
+ let db_schema_name = StringBuilder::new();
+ let table_name = StringBuilder::new();
+ let table_type = StringBuilder::new();
+
+ let table_schema = if include_schema {
+ Some(BinaryBuilder::new())
+ } else {
+ None
+ };
+
+ Self {
+ db_schema_filter_pattern: db_schema_filter_pattern.map(|s|
s.into()),
+ table_name_filter_pattern: table_name_filter_pattern.map(|t|
t.into()),
+ catalog_name,
+ db_schema_name,
+ table_name,
+ table_type,
+ table_schema,
+ }
+ }
+
+ /// Append a row
+ pub fn append(
+ &mut self,
+ catalog_name: impl AsRef<str>,
+ schema_name: impl AsRef<str>,
+ table_name: impl AsRef<str>,
+ table_type: impl AsRef<str>,
+ table_schema: &Schema,
+ ) -> Result<()> {
+ self.catalog_name.append_value(catalog_name);
+ self.db_schema_name.append_value(schema_name);
+ self.table_name.append_value(table_name);
+ self.table_type.append_value(table_type);
+ if let Some(self_table_schema) = self.table_schema.as_mut() {
+ let options = IpcWriteOptions::default();
+ // encode the schema into the correct form
+ let message: std::result::Result<IpcMessage, _> =
+ SchemaAsIpc::new(table_schema, &options).try_into();
+ let IpcMessage(schema) = message?;
+ self_table_schema.append_value(schema);
+ }
+
+ Ok(())
+ }
+
+ /// builds the correct schema
+ pub fn build(self) -> Result<RecordBatch> {
+ let Self {
+ db_schema_filter_pattern,
+ table_name_filter_pattern,
+
+ mut catalog_name,
+ mut db_schema_name,
+ mut table_name,
+ mut table_type,
+ table_schema,
+ } = self;
+
+ // Make the arrays
+ let catalog_name = catalog_name.finish();
+ let db_schema_name = db_schema_name.finish();
+ let table_name = table_name.finish();
+ let table_type = table_type.finish();
+ let table_schema = table_schema.map(|mut table_schema|
table_schema.finish());
+
+ // apply any filters, getting a BooleanArray that represents
+ // the rows that passed the filter
+ let mut filters = vec![];
+
+ if let Some(db_schema_filter_pattern) = db_schema_filter_pattern {
+ // use like kernel to get wildcard matching
+ filters.push(like_utf8_scalar(
+ &db_schema_name,
+ &db_schema_filter_pattern,
+ )?)
+ }
+
+ if let Some(table_name_filter_pattern) = table_name_filter_pattern {
+ // use like kernel to get wildcard matching
+ filters.push(like_utf8_scalar(&table_name,
&table_name_filter_pattern)?)
+ }
+
+ let include_schema = table_schema.is_some();
+ let batch = if let Some(table_schema) = table_schema {
+ RecordBatch::try_new(
+ get_tables_schema(include_schema),
+ vec![
+ Arc::new(catalog_name) as ArrayRef,
+ Arc::new(db_schema_name) as ArrayRef,
+ Arc::new(table_name) as ArrayRef,
+ Arc::new(table_type) as ArrayRef,
+ Arc::new(table_schema) as ArrayRef,
+ ],
+ )
+ } else {
+ RecordBatch::try_new(
+ get_tables_schema(include_schema),
+ vec![
+ Arc::new(catalog_name) as ArrayRef,
+ Arc::new(db_schema_name) as ArrayRef,
+ Arc::new(table_name) as ArrayRef,
+ Arc::new(table_type) as ArrayRef,
+ ],
+ )
+ }?;
+
+ // `AND` any filters together
+ let mut total_filter = None;
+ while let Some(filter) = filters.pop() {
+ let new_filter = match total_filter {
+ Some(total_filter) => and(&total_filter, &filter)?,
+ None => filter,
+ };
+ total_filter = Some(new_filter);
+ }
+
+ // Apply the filters if needed
+ let filtered_batch = if let Some(total_filter) = total_filter {
+ filter_record_batch(&batch, &total_filter)?
+ } else {
+ batch
+ };
+
+ // Order filtered results by catalog_name, then db_schema_name, then
table_name, then table_type
Review Comment:
👍
##########
arrow-flight/src/sql/catalogs/tables.rs:
##########
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`GetTablesBuilder`] for building responses to [`CommandGetTables`]
queries.
+//!
+//! [`CommandGetTables`]: crate::sql::CommandGetTables
+
+use std::sync::Arc;
+
+use arrow_arith::boolean::and;
+use arrow_array::builder::{BinaryBuilder, StringBuilder};
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use arrow_select::{filter::filter_record_batch, take::take};
+use arrow_string::like::like_utf8_scalar;
+use once_cell::sync::Lazy;
+
+use super::lexsort_to_indices;
+use crate::error::*;
+use crate::{IpcMessage, IpcWriteOptions, SchemaAsIpc};
+
+/// Return the schema of the RecordBatch that will be returned from
[`CommandGetTables`]
+///
+/// Note the schema differs based on the values of `include_schema
+///
+/// [`CommandGetTables`]: crate::sql::CommandGetTables
+pub fn get_tables_schema(include_schema: bool) -> SchemaRef {
+ if include_schema {
+ Arc::clone(&GET_TABLES_SCHEMA_WITH_TABLE_SCHEMA)
+ } else {
+ Arc::clone(&GET_TABLES_SCHEMA_WITHOUT_TABLE_SCHEMA)
+ }
+}
+
+/// Builds rows like this:
+///
+/// * catalog_name: utf8,
+/// * db_schema_name: utf8,
+/// * table_name: utf8 not null,
+/// * table_type: utf8 not null,
+/// * (optional) table_schema: bytes not null (schema of the table as described
+/// in Schema.fbs::Schema it is serialized as an IPC message.)
+pub struct GetTablesBuilder {
+ // Optional filters to apply to schemas
+ db_schema_filter_pattern: Option<String>,
+ // Optional filters to apply to tables
+ table_name_filter_pattern: Option<String>,
+ // array builder for catalog names
+ catalog_name: StringBuilder,
+ // array builder for db schema names
+ db_schema_name: StringBuilder,
+ // array builder for tables names
+ table_name: StringBuilder,
+ // array builder for table types
+ table_type: StringBuilder,
+ // array builder for table schemas
+ table_schema: Option<BinaryBuilder>,
+}
+
+impl GetTablesBuilder {
+ /// Create a new instance of [`GetTablesBuilder`]
+ ///
+ /// The builder handles filtering by schema and table patterns, the caller
+ /// is expected to only pass in tables that match the catalog and
table_type
+ /// from the [`CommandGetTables`] request.
+ ///
+ /// # Paramneters
+ ///
+ /// - `db_schema_filter_pattern`: Specifies a filter pattern for schemas
to search for.
+ /// When no pattern is provided, the pattern will not be used to narrow
the search.
+ /// In the pattern string, two special characters can be used to denote
matching rules:
+ /// - "%" means to match any substring with 0 or more characters.
+ /// - "_" means to match any one character.
+ /// - `table_name_filter_pattern`: Specifies a filter pattern for tables
to search for.
+ /// When no pattern is provided, all tables matching other filters are
searched.
+ /// In the pattern string, two special characters can be used to denote
matching rules:
+ /// - "%" means to match any substring with 0 or more characters.
+ /// - "_" means to match any one character.
+ /// - `include_schema`: Specifies if the Arrow schema should be returned
for found tables.
+ ///
+ /// [`CommandGetTables`]: crate::sql::CommandGetTables
+ pub fn new(
+ db_schema_filter_pattern: Option<impl Into<String>>,
+ table_name_filter_pattern: Option<impl Into<String>>,
+ include_schema: bool,
+ ) -> Self {
+ let catalog_name = StringBuilder::new();
+ let db_schema_name = StringBuilder::new();
+ let table_name = StringBuilder::new();
+ let table_type = StringBuilder::new();
+
+ let table_schema = if include_schema {
+ Some(BinaryBuilder::new())
+ } else {
+ None
+ };
+
+ Self {
+ db_schema_filter_pattern: db_schema_filter_pattern.map(|s|
s.into()),
+ table_name_filter_pattern: table_name_filter_pattern.map(|t|
t.into()),
+ catalog_name,
+ db_schema_name,
+ table_name,
+ table_type,
+ table_schema,
+ }
+ }
+
+ /// Append a row
+ pub fn append(
+ &mut self,
+ catalog_name: impl AsRef<str>,
+ schema_name: impl AsRef<str>,
+ table_name: impl AsRef<str>,
+ table_type: impl AsRef<str>,
+ table_schema: &Schema,
+ ) -> Result<()> {
+ self.catalog_name.append_value(catalog_name);
+ self.db_schema_name.append_value(schema_name);
+ self.table_name.append_value(table_name);
+ self.table_type.append_value(table_type);
+ if let Some(self_table_schema) = self.table_schema.as_mut() {
+ let options = IpcWriteOptions::default();
+ // encode the schema into the correct form
+ let message: std::result::Result<IpcMessage, _> =
+ SchemaAsIpc::new(table_schema, &options).try_into();
+ let IpcMessage(schema) = message?;
+ self_table_schema.append_value(schema);
+ }
+
+ Ok(())
+ }
+
+ /// builds the correct schema
Review Comment:
```suggestion
/// builds a `RecordBatch` for `CommandGetTables`
```
##########
arrow-flight/src/sql/catalogs/tables.rs:
##########
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`GetTablesBuilder`] for building responses to [`CommandGetTables`]
queries.
+//!
+//! [`CommandGetTables`]: crate::sql::CommandGetTables
+
+use std::sync::Arc;
+
+use arrow_arith::boolean::and;
+use arrow_array::builder::{BinaryBuilder, StringBuilder};
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use arrow_select::{filter::filter_record_batch, take::take};
+use arrow_string::like::like_utf8_scalar;
+use once_cell::sync::Lazy;
+
+use super::lexsort_to_indices;
+use crate::error::*;
+use crate::{IpcMessage, IpcWriteOptions, SchemaAsIpc};
+
+/// Return the schema of the RecordBatch that will be returned from
[`CommandGetTables`]
+///
+/// Note the schema differs based on the values of `include_schema
+///
+/// [`CommandGetTables`]: crate::sql::CommandGetTables
+pub fn get_tables_schema(include_schema: bool) -> SchemaRef {
+ if include_schema {
+ Arc::clone(&GET_TABLES_SCHEMA_WITH_TABLE_SCHEMA)
+ } else {
+ Arc::clone(&GET_TABLES_SCHEMA_WITHOUT_TABLE_SCHEMA)
+ }
+}
+
+/// Builds rows like this:
+///
+/// * catalog_name: utf8,
+/// * db_schema_name: utf8,
+/// * table_name: utf8 not null,
+/// * table_type: utf8 not null,
+/// * (optional) table_schema: bytes not null (schema of the table as described
+/// in Schema.fbs::Schema it is serialized as an IPC message.)
+pub struct GetTablesBuilder {
+ // Optional filters to apply to schemas
+ db_schema_filter_pattern: Option<String>,
+ // Optional filters to apply to tables
+ table_name_filter_pattern: Option<String>,
+ // array builder for catalog names
+ catalog_name: StringBuilder,
+ // array builder for db schema names
+ db_schema_name: StringBuilder,
+ // array builder for tables names
+ table_name: StringBuilder,
+ // array builder for table types
+ table_type: StringBuilder,
+ // array builder for table schemas
+ table_schema: Option<BinaryBuilder>,
+}
+
+impl GetTablesBuilder {
+ /// Create a new instance of [`GetTablesBuilder`]
+ ///
+ /// The builder handles filtering by schema and table patterns, the caller
Review Comment:
As I mentioned elsewhere I think it might be worth encapsulating all
filtering in the Builders to make the API easier to use
##########
arrow-flight/src/sql/catalogs/tables.rs:
##########
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`GetTablesBuilder`] for building responses to [`CommandGetTables`]
queries.
+//!
+//! [`CommandGetTables`]: crate::sql::CommandGetTables
+
+use std::sync::Arc;
+
+use arrow_arith::boolean::and;
+use arrow_array::builder::{BinaryBuilder, StringBuilder};
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use arrow_select::{filter::filter_record_batch, take::take};
+use arrow_string::like::like_utf8_scalar;
+use once_cell::sync::Lazy;
+
+use super::lexsort_to_indices;
+use crate::error::*;
+use crate::{IpcMessage, IpcWriteOptions, SchemaAsIpc};
+
+/// Return the schema of the RecordBatch that will be returned from
[`CommandGetTables`]
+///
+/// Note the schema differs based on the values of `include_schema
+///
+/// [`CommandGetTables`]: crate::sql::CommandGetTables
+pub fn get_tables_schema(include_schema: bool) -> SchemaRef {
+ if include_schema {
+ Arc::clone(&GET_TABLES_SCHEMA_WITH_TABLE_SCHEMA)
+ } else {
+ Arc::clone(&GET_TABLES_SCHEMA_WITHOUT_TABLE_SCHEMA)
+ }
+}
+
+/// Builds rows like this:
+///
+/// * catalog_name: utf8,
+/// * db_schema_name: utf8,
+/// * table_name: utf8 not null,
+/// * table_type: utf8 not null,
+/// * (optional) table_schema: bytes not null (schema of the table as described
+/// in Schema.fbs::Schema it is serialized as an IPC message.)
+pub struct GetTablesBuilder {
+ // Optional filters to apply to schemas
+ db_schema_filter_pattern: Option<String>,
+ // Optional filters to apply to tables
+ table_name_filter_pattern: Option<String>,
+ // array builder for catalog names
+ catalog_name: StringBuilder,
+ // array builder for db schema names
+ db_schema_name: StringBuilder,
+ // array builder for tables names
+ table_name: StringBuilder,
+ // array builder for table types
+ table_type: StringBuilder,
+ // array builder for table schemas
+ table_schema: Option<BinaryBuilder>,
+}
+
+impl GetTablesBuilder {
+ /// Create a new instance of [`GetTablesBuilder`]
+ ///
+ /// The builder handles filtering by schema and table patterns, the caller
+ /// is expected to only pass in tables that match the catalog and
table_type
+ /// from the [`CommandGetTables`] request.
+ ///
+ /// # Paramneters
+ ///
+ /// - `db_schema_filter_pattern`: Specifies a filter pattern for schemas
to search for.
+ /// When no pattern is provided, the pattern will not be used to narrow
the search.
+ /// In the pattern string, two special characters can be used to denote
matching rules:
+ /// - "%" means to match any substring with 0 or more characters.
+ /// - "_" means to match any one character.
+ /// - `table_name_filter_pattern`: Specifies a filter pattern for tables
to search for.
+ /// When no pattern is provided, all tables matching other filters are
searched.
+ /// In the pattern string, two special characters can be used to denote
matching rules:
+ /// - "%" means to match any substring with 0 or more characters.
+ /// - "_" means to match any one character.
+ /// - `include_schema`: Specifies if the Arrow schema should be returned
for found tables.
+ ///
+ /// [`CommandGetTables`]: crate::sql::CommandGetTables
+ pub fn new(
+ db_schema_filter_pattern: Option<impl Into<String>>,
+ table_name_filter_pattern: Option<impl Into<String>>,
+ include_schema: bool,
+ ) -> Self {
+ let catalog_name = StringBuilder::new();
+ let db_schema_name = StringBuilder::new();
+ let table_name = StringBuilder::new();
+ let table_type = StringBuilder::new();
+
+ let table_schema = if include_schema {
+ Some(BinaryBuilder::new())
+ } else {
+ None
+ };
+
+ Self {
+ db_schema_filter_pattern: db_schema_filter_pattern.map(|s|
s.into()),
+ table_name_filter_pattern: table_name_filter_pattern.map(|t|
t.into()),
+ catalog_name,
+ db_schema_name,
+ table_name,
+ table_type,
+ table_schema,
+ }
+ }
+
+ /// Append a row
+ pub fn append(
+ &mut self,
+ catalog_name: impl AsRef<str>,
+ schema_name: impl AsRef<str>,
+ table_name: impl AsRef<str>,
+ table_type: impl AsRef<str>,
+ table_schema: &Schema,
+ ) -> Result<()> {
+ self.catalog_name.append_value(catalog_name);
+ self.db_schema_name.append_value(schema_name);
+ self.table_name.append_value(table_name);
+ self.table_type.append_value(table_type);
+ if let Some(self_table_schema) = self.table_schema.as_mut() {
+ let options = IpcWriteOptions::default();
+ // encode the schema into the correct form
+ let message: std::result::Result<IpcMessage, _> =
+ SchemaAsIpc::new(table_schema, &options).try_into();
+ let IpcMessage(schema) = message?;
+ self_table_schema.append_value(schema);
+ }
+
+ Ok(())
+ }
+
+ /// builds the correct schema
+ pub fn build(self) -> Result<RecordBatch> {
+ let Self {
+ db_schema_filter_pattern,
+ table_name_filter_pattern,
+
+ mut catalog_name,
+ mut db_schema_name,
+ mut table_name,
+ mut table_type,
+ table_schema,
+ } = self;
+
+ // Make the arrays
+ let catalog_name = catalog_name.finish();
+ let db_schema_name = db_schema_name.finish();
+ let table_name = table_name.finish();
+ let table_type = table_type.finish();
+ let table_schema = table_schema.map(|mut table_schema|
table_schema.finish());
+
+ // apply any filters, getting a BooleanArray that represents
+ // the rows that passed the filter
+ let mut filters = vec![];
+
+ if let Some(db_schema_filter_pattern) = db_schema_filter_pattern {
+ // use like kernel to get wildcard matching
+ filters.push(like_utf8_scalar(
+ &db_schema_name,
+ &db_schema_filter_pattern,
+ )?)
+ }
+
+ if let Some(table_name_filter_pattern) = table_name_filter_pattern {
+ // use like kernel to get wildcard matching
+ filters.push(like_utf8_scalar(&table_name,
&table_name_filter_pattern)?)
+ }
+
+ let include_schema = table_schema.is_some();
+ let batch = if let Some(table_schema) = table_schema {
+ RecordBatch::try_new(
+ get_tables_schema(include_schema),
+ vec![
+ Arc::new(catalog_name) as ArrayRef,
+ Arc::new(db_schema_name) as ArrayRef,
+ Arc::new(table_name) as ArrayRef,
+ Arc::new(table_type) as ArrayRef,
+ Arc::new(table_schema) as ArrayRef,
+ ],
+ )
+ } else {
+ RecordBatch::try_new(
+ get_tables_schema(include_schema),
+ vec![
+ Arc::new(catalog_name) as ArrayRef,
+ Arc::new(db_schema_name) as ArrayRef,
+ Arc::new(table_name) as ArrayRef,
+ Arc::new(table_type) as ArrayRef,
+ ],
+ )
+ }?;
+
+ // `AND` any filters together
+ let mut total_filter = None;
+ while let Some(filter) = filters.pop() {
+ let new_filter = match total_filter {
+ Some(total_filter) => and(&total_filter, &filter)?,
+ None => filter,
+ };
+ total_filter = Some(new_filter);
+ }
+
+ // Apply the filters if needed
+ let filtered_batch = if let Some(total_filter) = total_filter {
+ filter_record_batch(&batch, &total_filter)?
+ } else {
+ batch
+ };
+
+ // Order filtered results by catalog_name, then db_schema_name, then
table_name, then table_type
+ //
https://github.com/apache/arrow/blob/130f9e981aa98c25de5f5bfe55185db270cec313/format/FlightSql.proto#LL1202C1-L1202C1
+ let sort_cols = filtered_batch.project(&[0, 1, 2, 3])?;
+ let indices = lexsort_to_indices(sort_cols.columns());
+ let columns = filtered_batch
+ .columns()
+ .iter()
+ .map(|c| take(c, &indices, None))
+ .collect::<std::result::Result<Vec<_>, _>>()?;
+
+ Ok(RecordBatch::try_new(
+ get_tables_schema(include_schema),
+ columns,
+ )?)
+ }
+}
+
+/// The schema for GetTables without `table_schema` column
+static GET_TABLES_SCHEMA_WITHOUT_TABLE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("catalog_name", DataType::Utf8, false),
+ Field::new("db_schema_name", DataType::Utf8, false),
+ Field::new("table_name", DataType::Utf8, false),
+ Field::new("table_type", DataType::Utf8, false),
+ ]))
+});
+
+/// The schema for GetTables with `table_schema` column
+static GET_TABLES_SCHEMA_WITH_TABLE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("catalog_name", DataType::Utf8, false),
+ Field::new("db_schema_name", DataType::Utf8, false),
+ Field::new("table_name", DataType::Utf8, false),
+ Field::new("table_type", DataType::Utf8, false),
+ Field::new("table_schema", DataType::Binary, false),
+ ]))
+});
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_array::{StringArray, UInt32Array};
+
+ fn get_ref_batch() -> RecordBatch {
+ RecordBatch::try_new(
+ get_tables_schema(false),
+ vec![
+ Arc::new(StringArray::from(vec![
+ "a_catalog",
+ "a_catalog",
+ "a_catalog",
+ "a_catalog",
+ "b_catalog",
+ "b_catalog",
+ "b_catalog",
+ "b_catalog",
+ ])) as ArrayRef,
+ Arc::new(StringArray::from(vec![
+ "a_schema", "a_schema", "b_schema", "b_schema", "a_schema",
+ "a_schema", "b_schema", "b_schema",
+ ])) as ArrayRef,
+ Arc::new(StringArray::from(vec![
+ "a_table", "b_table", "a_table", "b_table", "a_table",
"a_table",
+ "b_table", "b_table",
+ ])) as ArrayRef,
+ Arc::new(StringArray::from(vec![
+ "TABLE", "TABLE", "TABLE", "TABLE", "TABLE", "VIEW",
"TABLE", "VIEW",
+ ])) as ArrayRef,
+ ],
+ )
+ .unwrap()
+ }
+
+ #[test]
+ fn test_tables_are_filterd() {
Review Comment:
```suggestion
fn test_tables_are_filtered() {
```
##########
arrow-flight/src/sql/catalogs/db_schemas.rs:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`GetSchemasBuilder`] for building responses to [`CommandGetDbSchemas`]
queries.
+//!
+//! [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+
+use std::sync::Arc;
+
+use arrow_array::{builder::StringBuilder, ArrayRef, RecordBatch};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use arrow_select::{filter::filter_record_batch, take::take};
+use arrow_string::like::like_utf8_scalar;
+use once_cell::sync::Lazy;
+
+use super::lexsort_to_indices;
+use crate::error::*;
+
+/// Return the schema of the RecordBatch that will be returned from
[`CommandGetDbSchemas`]
+///
+/// [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+pub fn get_db_schemas_schema() -> SchemaRef {
+ Arc::clone(&GET_DB_SCHEMAS_SCHEMA)
+}
+
+/// The schema for GetDbSchemas
+static GET_DB_SCHEMAS_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("catalog_name", DataType::Utf8, false),
+ Field::new("db_schema_name", DataType::Utf8, false),
+ ]))
+});
+
+/// Builds rows like this:
+///
+/// * catalog_name: utf8,
+/// * db_schema_name: utf8,
+pub struct GetSchemasBuilder {
+ // Optional filters to apply
+ db_schema_filter_pattern: Option<String>,
+ // array builder for catalog names
+ catalog_name: StringBuilder,
+ // array builder for schema names
+ db_schema_name: StringBuilder,
+}
+
+impl GetSchemasBuilder {
+ /// Create a new instance of [`GetSchemasBuilder`]
+ ///
+ /// The builder handles filtering by schemapatterns, the caller
+ /// is expected to only pass in tables that match the catalog
+ /// from the [`CommandGetDbSchemas`] request.
+ ///
+ /// # Parameters
+ ///
+ /// - `db_schema_filter_pattern`: Specifies a filter pattern for schemas
to search for.
+ /// When no pattern is provided, the pattern will not be used to narrow
the search.
+ /// In the pattern string, two special characters can be used to denote
matching rules:
+ /// - "%" means to match any substring with 0 or more characters.
+ /// - "_" means to match any one character.
+ ///
+ /// [`CommandGetDbSchemas`]: crate::sql::CommandGetDbSchemas
+ pub fn new(db_schema_filter_pattern: Option<impl Into<String>>) -> Self {
+ let catalog_name = StringBuilder::new();
+ let db_schema_name = StringBuilder::new();
+ Self {
+ db_schema_filter_pattern: db_schema_filter_pattern.map(|v|
v.into()),
+ catalog_name,
+ db_schema_name,
+ }
+ }
+
+ /// Append a row
+ pub fn append(
+ &mut self,
+ catalog_name: impl AsRef<str>,
+ schema_name: impl AsRef<str>,
+ ) -> Result<()> {
+ self.catalog_name.append_value(catalog_name);
+ self.db_schema_name.append_value(schema_name);
+ Ok(())
+ }
+
+ /// builds the correct schema
+ pub fn build(self) -> Result<RecordBatch> {
+ let Self {
+ db_schema_filter_pattern,
+ mut catalog_name,
+ mut db_schema_name,
+ } = self;
+
+ // Make the arrays
+ let catalog_name = catalog_name.finish();
+ let db_schema_name = db_schema_name.finish();
+
+ // the filter, if requested, getting a BooleanArray that represents
the rows that passed the filter
+ let filter = db_schema_filter_pattern
+ .map(|db_schema_filter_pattern| {
+ // use like kernel to get wildcard matching
+ like_utf8_scalar(&db_schema_name, &db_schema_filter_pattern)
+ })
+ .transpose()?;
+
+ let batch = RecordBatch::try_new(
+ get_db_schemas_schema(),
+ vec![
+ Arc::new(catalog_name) as ArrayRef,
+ Arc::new(db_schema_name) as ArrayRef,
+ ],
+ )?;
+
+ // Apply the filters if needed
+ let filtered_batch = if let Some(filter) = filter {
+ filter_record_batch(&batch, &filter)?
+ } else {
+ batch
+ };
+
+ // Order filtered results by catalog_name, then db_schema_name
+ let indices = lexsort_to_indices(filtered_batch.columns());
+ let columns = filtered_batch
+ .columns()
+ .iter()
+ .map(|c| take(c, &indices, None))
+ .collect::<std::result::Result<Vec<_>, _>>()?;
+
+ Ok(RecordBatch::try_new(get_db_schemas_schema(), columns)?)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_array::{StringArray, UInt32Array};
+
+ fn get_ref_batch() -> RecordBatch {
+ RecordBatch::try_new(
+ get_db_schemas_schema(),
+ vec![
+ Arc::new(StringArray::from(vec![
+ "a_catalog",
+ "a_catalog",
+ "b_catalog",
+ "b_catalog",
+ ])) as ArrayRef,
+ Arc::new(StringArray::from(vec![
+ "a_schema", "b_schema", "a_schema", "b_schema",
+ ])) as ArrayRef,
+ ],
+ )
+ .unwrap()
+ }
+
+ #[test]
+ fn test_schemas_are_filterd() {
Review Comment:
```suggestion
fn test_schemas_are_filtered() {
```
##########
arrow-flight/examples/flight_sql_server.rs:
##########
@@ -248,32 +255,106 @@ impl FlightSqlService for FlightSqlServiceImpl {
async fn get_flight_info_catalogs(
&self,
- _query: CommandGetCatalogs,
- _request: Request<FlightDescriptor>,
+ query: CommandGetCatalogs,
+ request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
- Err(Status::unimplemented(
- "get_flight_info_catalogs not implemented",
- ))
+ let flight_descriptor = request.into_inner();
+ let ticket = Ticket {
+ ticket: query.encode_to_vec().into(),
+ };
+
+ let options = IpcWriteOptions::default();
+
+ // encode the schema into the correct form
+ let IpcMessage(schema) = SchemaAsIpc::new(get_catalogs_schema(),
&options)
+ .try_into()
+ .expect("valid catalogs schema");
Review Comment:
I think we might be able to use the code from
https://github.com/apache/arrow-rs/pull/4294 to reduce this replication as well
as make it clearer this should never error 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]