janbraunsdorff opened a new issue, #1268:
URL: https://github.com/apache/datafusion-ballista/issues/1268
**Describe the bug**
Hello,
I try to remove Spark with Ballista. The Usecase is to read a Delta Table
from S3, do some stuff, and wirte it back to S3. Unfortunately I am stuck on
the scheduler. No matter what I try, i alwy get: `Could not parse logical plan
protobuf: Internal error: Error encoding delta table`. I run this in
standalone, too, and everything works as aspected. Can someone help me or has
some working example?
**To Reproduce**
Enviroment: Kubernets Cluster with 1 Scheduler and 1 Executor in version
46.0.1
I extendet the scheduler and executor as mention in
https://github.com/milenkovicm/ballista_delta and
https://github.com/apache/datafusion-ballista/issues/1241.
Scheduler:
```rust
// bin/main.rs
let config = config
.with_override_config_producer(Arc::new(session_config_with_s3_support))
.with_override_session_builder(Arc::new(session_state_with_s3_support))
.wih_override_logical_codec(Arc::new(BallistaDeltaLogicalCodec::default()))
.wih_override_physical_codec(Arc::new(BallistaDeltaPhysicalCodec::default()));
// config.rs
impl SchedulerConfig {
....
pub fn wih_override_logical_codec(
mut self,
override_logical_codec: Arc<dyn LogicalExtensionCodec>
) -> Self {
self.override_logical_codec = Some(override_logical_codec);
self
}
pub fn wih_override_physical_codec(
mut self,
override_physical_codec: Arc<dyn PhysicalExtensionCodec>
) -> Self {
self.override_physical_codec = Some(override_physical_codec);
self
}
}
```
Executor:
```rust
let mut config: ExecutorProcessConfig = opt.try_into()?;
config.override_config_producer =
Some(Arc::new(session_config_with_s3_support));
config.override_runtime_producer =
Some(Arc::new(runtime_env_with_s3_support));
config.override_logical_codec =
Some(Arc::new(BallistaDeltaLogicalCodec::default()));
config.override_physical_codec =
Some(Arc::new(BallistaDeltaPhysicalCodec::default()));
```
Client Main
```rust
let config = SessionConfig::new_with_ballista()
.with_information_schema(true)
.with_ballista_logical_extension_codec(Arc::new(BallistaDeltaLogicalCodec::default()))
.with_ballista_physical_extension_codec(Arc::new(BallistaDeltaPhysicalCodec::default()))
.with_option_extension(S3Options::default());
let runtime_env = RuntimeEnvBuilder::new().build()?;
runtime_env.register_object_store(
&format!("s3://{}", BUCKET)
.as_str()
.try_into()
.unwrap(),
Arc::new(create_s3_store()?),
);
let state = SessionStateBuilder::new()
.with_runtime_env(runtime_env.into())
.with_config(config)
.with_default_features()
.with_table_factory("DELTATABLE".to_string(), Arc::new(DeltaTableFactory
{}))
.build();
let ctx: SessionContext =
SessionContext::remote_with_state("df://localhost:50050", state).await?;
// using SQL
ctx.register_object_store(&format!("s3://{}", BUCKET).as_str().try_into()?,
Arc::new(create_s3_store()?));
ctx.sql(&format!("SET s3.access_key_id = '{}'", ACCESS_KEY_ID)).await?;
ctx.sql(&format!("SET s3.secret_access_key = '{}'", SECRET_KEY)).await?;
ctx.sql(&format!("SET s3.endpoint = '{}://{}'", PROTOCOLL, HOST)).await?;
ctx.sql("SET s3.allow_http = true").await?;
println!("register table remote");
ctx.register_parquet(
"test",
"s3://bucket/foo.parquet",
ParquetReadOptions::default(),
).await?;
// using delta-rs and delta lake
deltalake::aws::register_handlers(None);
let table =
open_table("s3a://ka-etu-dih-001-datafusion-001/data/customer").await?;
ctx.register_table("customer", Arc::new(table))?;
```
**Expected behavior**
Scheduler can parse Delta Table and distribute plans to the executors
**Additional context**
```sh
Error: Arrow error: External error: Execution error: Fail to execute query
due to ExecuteQueryFailureResult { failure: Some(PlanParsingFailure("Could not
parse logical plan protobuf: Internal error: Error encoding delta table.\nThis
was likely caused by a bug in DataFusion's code and we would welcome that you
file an bug report in our issue tracker")) }
Caused by:
0: External error: Execution error: Fail to execute query due to
ExecuteQueryFailureResult { failure: Some(PlanParsingFailure("Could not parse
logical plan protobuf: Internal error: Error encoding delta table.\nThis was
likely caused by a bug in DataFusion's code and we would welcome that you file
an bug report in our issue tracker")) }
1: Execution error: Fail to execute query due to
ExecuteQueryFailureResult { failure: Some(PlanParsingFailure("Could not parse
logical plan protobuf: Internal error: Error encoding delta table.\nThis was
likely caused by a bug in DataFusion's code and we would welcome that you file
an bug report in our issue tracker")) }
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]