jiacai2050 commented on code in PR #1585:
URL: https://github.com/apache/horaedb/pull/1585#discussion_r1827146902
##########
horaedb/metric_engine/src/storage.rs:
##########
@@ -117,15 +130,44 @@ impl CloudObjectStorage {
AsyncArrowWriter::try_new(object_store_writer,
self.schema().clone(), req.props)
.context("create arrow writer")?;
- // TODO: sort record batch according to primary key columns.
- writer
- .write(&req.batch)
- .await
- .context("write arrow batch")?;
+ // sort record batch
+ let batch = self.sort_batch(req.batch).await?;
+ writer.write(&batch).await.context("write arrow batch")?;
writer.close().await.context("close arrow writer")?;
Ok(file_id)
}
+
+ async fn sort_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
+ let ctx = SessionContext::default();
+ let schema = batch.schema();
+ let df_schema = DFSchema::try_from(schema.clone()).context("build
DFSchema")?;
+
+ let sort_exprs = self
+ .primary_key_indexs
+ .iter()
+ .map(|i| ident(schema.clone().field(*i).name()).sort(true, true))
+ .collect::<Vec<_>>();
+ let physical_sort_exprs =
+ create_physical_sort_exprs(&sort_exprs, &df_schema,
&ExecutionProps::default())
+ .context("create physical sort exprs")?;
+
+ let batch_plan =
+ MemoryExec::try_new(&[vec![batch]], schema, None).context("build
batch plan")?;
+ let physical_plan = SortExec::new(physical_sort_exprs,
Arc::new(batch_plan));
+ let mut stream = physical_plan
+ .execute(0, ctx.task_ctx())
+ .context("execute sort physical plan")?;
+
+ let mut batches = Vec::new();
+ while let Some(batch) = stream.next().await {
Review Comment:
The batch return from `next()` may not equals input, there are no guarantees
for that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]