milenkovicm commented on code in PR #1420:
URL:
https://github.com/apache/datafusion-ballista/pull/1420#discussion_r2795572324
##########
ballista/core/src/extension.rs:
##########
@@ -665,6 +671,100 @@ impl BallistaConfigGrpcEndpoint {
#[derive(Clone, Copy)]
pub struct BallistaUseTls(pub bool);
+#[derive(Debug)]
+struct BallistaCacheFactory;
+
+impl BallistaCacheFactory {
+ fn new() -> Self {
+ Self {}
+ }
+}
+
+impl CacheFactory for BallistaCacheFactory {
+ fn create(
+ &self,
+ plan: LogicalPlan,
+ session_state: &SessionState,
+ ) -> datafusion::error::Result<LogicalPlan> {
+ Ok(LogicalPlan::Extension(Extension {
+ node: Arc::new(BallistaCacheNode::new(
+ Uuid::new_v4().to_string(),
+ session_state.session_id().to_string(),
+ plan,
+ )),
+ }))
+ }
+}
+
+/// Ballista logical Extension for caching.
+#[derive(PartialEq, Eq, PartialOrd, Hash, Debug)]
+pub struct BallistaCacheNode {
+ cache_id: String,
+ session_id: String,
+ input: LogicalPlan,
+ exprs: Vec<Expr>,
+}
+
+impl BallistaCacheNode {
+ /// Create a new cache node from provided logical input plan and cache
infos.
+ pub fn new(cache_id: String, session_id: String, input: LogicalPlan) ->
Self {
+ Self {
+ cache_id,
+ session_id,
+ input,
+ exprs: vec![],
+ }
+ }
+
+ /// Returns cache id.
+ pub fn cache_id(&self) -> &str {
+ self.cache_id.as_str()
+ }
+
+ /// Returns session id.
+ pub fn session_id(&self) -> &str {
+ self.session_id.as_str()
+ }
+}
+
+impl UserDefinedLogicalNodeCore for BallistaCacheNode {
+ fn name(&self) -> &str {
+ "BallistaCacheNode"
+ }
+
+ fn inputs(&self) -> Vec<&LogicalPlan> {
+ vec![&self.input]
+ }
+
+ fn schema(&self) -> &DFSchemaRef {
+ self.input.schema()
+ }
+
+ fn expressions(&self) -> Vec<Expr> {
+ self.exprs.clone()
Review Comment:
can we just return empty vector instead of having property exprs, will they
ever be non empty?
##########
ballista/core/src/extension.rs:
##########
@@ -665,6 +671,100 @@ impl BallistaConfigGrpcEndpoint {
#[derive(Clone, Copy)]
pub struct BallistaUseTls(pub bool);
+#[derive(Debug)]
+struct BallistaCacheFactory;
+
+impl BallistaCacheFactory {
+ fn new() -> Self {
+ Self {}
+ }
+}
+
+impl CacheFactory for BallistaCacheFactory {
+ fn create(
+ &self,
+ plan: LogicalPlan,
+ session_state: &SessionState,
+ ) -> datafusion::error::Result<LogicalPlan> {
+ Ok(LogicalPlan::Extension(Extension {
Review Comment:
i wonder if we should have a ballista config option to "shortcut" cache. At
the moment cache() will fail as there is no cache functionality implemented.
But if we flip the switch, instead of returning LogicalPlan extension we can
return `plan`. There will be no cache but pipelines relying on cache will not
fail (it will be noop), wdyt?
##########
ballista/core/proto/ballista.proto:
##########
@@ -27,6 +27,15 @@ option java_outer_classname = "BallistaProto";
import "datafusion.proto";
import "datafusion_common.proto";
+///////////////////////////////////////////////////////////////////////////////////////////////////
+// Ballista Logical Plan
+///////////////////////////////////////////////////////////////////////////////////////////////////
+
Review Comment:
I would suggest to add additional node, `BallistaLogicalPlaneNode` which
would have `oneof` where `BallistaLogicalPlanCacheNode` would be part of it,
similar to `BallistaPhysicalPlanNode`
maybe `CacheLogicalPlanExtension` would be better name for
`BallistaLogicalPlanCacheNode`
##########
ballista/core/src/extension.rs:
##########
@@ -665,6 +671,100 @@ impl BallistaConfigGrpcEndpoint {
#[derive(Clone, Copy)]
pub struct BallistaUseTls(pub bool);
+#[derive(Debug)]
+struct BallistaCacheFactory;
+
+impl BallistaCacheFactory {
+ fn new() -> Self {
+ Self {}
+ }
+}
+
+impl CacheFactory for BallistaCacheFactory {
+ fn create(
+ &self,
+ plan: LogicalPlan,
+ session_state: &SessionState,
+ ) -> datafusion::error::Result<LogicalPlan> {
+ Ok(LogicalPlan::Extension(Extension {
Review Comment:
```
if config.ballista_cache_noop {
plan
} else {
Ok(LogicalPlan::Extension ...
..
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]