jiacai2050 commented on code in PR #1585:
URL: https://github.com/apache/horaedb/pull/1585#discussion_r1827259234
##########
horaedb/metric_engine/src/storage.rs:
##########
@@ -117,15 +130,44 @@ impl CloudObjectStorage {
AsyncArrowWriter::try_new(object_store_writer,
self.schema().clone(), req.props)
.context("create arrow writer")?;
- // TODO: sort record batch according to primary key columns.
- writer
- .write(&req.batch)
- .await
- .context("write arrow batch")?;
+ // sort record batch
+ let batches = self.sort_batch(req.batch).await?;
+ for batch in batches {
+ writer.write(&batch).await.context("write arrow batch")?;
+ }
writer.close().await.context("close arrow writer")?;
Ok(file_id)
}
+
+ async fn sort_batch(&self, batch: RecordBatch) -> Result<Vec<RecordBatch>>
{
+ let ctx = SessionContext::default();
+ let schema = batch.schema();
+ let df_schema = DFSchema::try_from(schema.clone()).context("build
DFSchema")?;
+
+ let sort_exprs = (0..self.num_primary_key)
+ .collect::<Vec<_>>()
+ .iter()
+ .map(|i| ident(schema.clone().field(*i).name()).sort(true, true))
+ .collect::<Vec<_>>();
+ let physical_sort_exprs =
+ create_physical_sort_exprs(&sort_exprs, &df_schema,
&ExecutionProps::default())
+ .context("create physical sort exprs")?;
+
+ let batch_plan =
+ MemoryExec::try_new(&[vec![batch]], schema, None).context("build
batch plan")?;
+ let physical_plan = SortExec::new(physical_sort_exprs,
Arc::new(batch_plan));
+ let mut stream = physical_plan
Review Comment:
Please use
[collect](https://docs.rs/datafusion/latest/datafusion/physical_plan/fn.collect.html)
to fetch results.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]