alamb commented on code in PR #3907:
URL: https://github.com/apache/arrow-datafusion/pull/3907#discussion_r1003695455
##########
datafusion/core/src/execution/context.rs:
##########
@@ -418,19 +418,18 @@ impl SessionContext {
cmd: &CreateExternalTable,
) -> Result<Arc<DataFrame>> {
let state = self.state.read().clone();
+ let file_type = cmd.file_type.to_lowercase();
Review Comment:
If there is an assumption that the file_types are always lower case in
`table_factories` perhaps would it make sense to update the comment to that
effect?
https://github.com/apache/arrow-datafusion/blob/6e0097d35391fea0d57c1d2ecfdef18437f681f4/datafusion/core/src/execution/runtime_env.rs#L48
##########
datafusion/proto/src/logical_plan.rs:
##########
@@ -93,6 +100,22 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
Self: Sized;
}
+pub trait PhysicalExtensionCodec: Debug + Send + Sync {
Review Comment:
Is this trait a left over? I didn't think this PR added support for
serializing / deserializing physical plans
##########
datafusion/proto/src/lib.rs:
##########
@@ -129,6 +135,26 @@ mod roundtrip_tests {
Ok(())
}
+ #[tokio::test]
+ async fn roundtrip_custom_tables() -> Result<(), DataFusionError> {
+ let mut table_factories: HashMap<String, Arc<dyn
TableProviderFactory>> =
+ HashMap::new();
+ table_factories.insert("deltatable".to_string(),
Arc::new(TestTableFactory {}));
+ let cfg = RuntimeConfig::new().with_table_factories(table_factories);
Review Comment:
Yeah -- this makes the most sense -- basically the system which made the
table provider needs to provide details on how to serialize/deserialize it
##########
datafusion/proto/src/bytes/mod.rs:
##########
@@ -180,6 +188,27 @@ impl LogicalExtensionCodec for DefaultExtensionCodec {
"No extension codec provided".to_string(),
))
}
+
+ async fn try_decode_table_provider(
+ &self,
+ _buf: &[u8],
+ _schema: SchemaRef,
+ _ctx: &SessionContext,
+ ) -> std::result::Result<Arc<dyn TableProvider>, DataFusionError> {
+ Err(DataFusionError::NotImplemented(
+ "No extension codec provided".to_string(),
Review Comment:
```suggestion
"No codec provided to for TableProviders".to_string(),
```
##########
datafusion/proto/src/logical_plan.rs:
##########
@@ -625,11 +714,11 @@ impl AsLogicalPlan for LogicalPlanNode {
builder.build()
}
LogicalPlanType::Union(union) => {
- let mut input_plans: Vec<LogicalPlan> = union
- .inputs
- .iter()
- .map(|i| i.try_into_logical_plan(ctx, extension_codec))
- .collect::<Result<_, DataFusionError>>()?;
+ let mut input_plans: Vec<LogicalPlan> = vec![];
Review Comment:
making something async and having to switch from a functional style is
unfortunate -- one can also make streams but that syntax (well really the
errors if you get it wrong) becomes wild
##########
datafusion/proto/src/logical_plan.rs:
##########
@@ -93,6 +100,22 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
Self: Sized;
}
+pub trait PhysicalExtensionCodec: Debug + Send + Sync {
+ fn try_decode(
+ &self,
+ buf: &[u8],
+ inputs: &[Arc<dyn ExecutionPlan>],
+ registry: &dyn FunctionRegistry,
+ ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>;
+
+ fn try_encode(
+ &self,
+ node: Arc<dyn ExecutionPlan>,
+ buf: &mut Vec<u8>,
+ ) -> Result<(), DataFusionError>;
+}
+
+#[async_trait]
pub trait LogicalExtensionCodec: Debug + Send + Sync {
Review Comment:
We wonder if we should rename `LogicalExtensionCodec` to something more
generic, but we can do that in the future perhaps
##########
datafusion/proto/src/bytes/mod.rs:
##########
@@ -180,6 +188,27 @@ impl LogicalExtensionCodec for DefaultExtensionCodec {
"No extension codec provided".to_string(),
))
}
+
+ async fn try_decode_table_provider(
+ &self,
+ _buf: &[u8],
+ _schema: SchemaRef,
+ _ctx: &SessionContext,
+ ) -> std::result::Result<Arc<dyn TableProvider>, DataFusionError> {
+ Err(DataFusionError::NotImplemented(
+ "No extension codec provided".to_string(),
+ ))
+ }
+
+ fn try_encode_table_provider(
+ &self,
+ _node: Arc<dyn TableProvider>,
+ _buf: &mut Vec<u8>,
+ ) -> std::result::Result<(), DataFusionError> {
+ Err(DataFusionError::NotImplemented(
+ "No extension codec provided".to_string(),
Review Comment:
```suggestion
"No codec provided to for TableProviders".to_string(),
```
##########
datafusion/proto/src/bytes/mod.rs:
##########
@@ -132,37 +136,41 @@ pub fn logical_plan_to_bytes_with_extension_codec(
/// Deserialize a LogicalPlan from json
#[cfg(feature = "json")]
-pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) ->
Result<LogicalPlan> {
+pub async fn logical_plan_from_json(
+ json: &str,
+ ctx: &SessionContext,
+) -> Result<LogicalPlan> {
let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
.map_err(|e| DataFusionError::Plan(format!("Error serializing plan:
{}", e)))?;
let extension_codec = DefaultExtensionCodec {};
- back.try_into_logical_plan(ctx, &extension_codec)
+ back.try_into_logical_plan(ctx, &extension_codec).await
}
/// Deserialize a LogicalPlan from bytes
-pub fn logical_plan_from_bytes(
+pub async fn logical_plan_from_bytes(
Review Comment:
It makes sense to me that these functions must become `async` in order to
(potentially) instantiate a table provider 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]