jiangzhx edited a comment on issue #1041:
URL:
https://github.com/apache/arrow-datafusion/issues/1041#issuecomment-926361164
i did more test, load parquet file to memtable first then query sql;
SELECT sum(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue FROM
lineorder_flat;
with simd and mimalloc command:
RUSTFLAGS='-C target-cpu=native' cargo +nightly run --release
--features "simd mimalloc" --example ssb_memtable_sum_example
prepare memtable usage millis: 130
sql execution usage millis: 26
with mimalloc command:
RUSTFLAGS='-C target-cpu=native' cargo +nightly run --release
--features "mimalloc" --example ssb_memtable_sum_example
prepare memtable usage millis: 130
sql execution usage millis: 25
data rows: 59,986,052 rows
i guess the reson should be like @jorgecarleitao and @alamb comments;
rust already compiler code to use SIMD automatically;
code:
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::print_batches;
use chrono::prelude::*;
use datafusion::prelude::*;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::datasource::parquet::ParquetTable;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::logical_plan::Expr;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{common, displayable};
use futures::StreamExt;
use std::sync::Arc;
#[cfg(feature = "snmalloc")]
#[global_allocator]
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
#[cfg(feature = "mimalloc")]
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let parquet_path =
"/Users/sylar/workspace/opensource/ssb-dbgen/output/parquet_10";
let lineorder_flat_schema = Schema::new(vec![
Field::new("LO_ORDERKEY", DataType::Int64, false),
Field::new("LO_LINENUMBER", DataType::Int64, false),
Field::new("LO_CUSTKEY", DataType::Int64, false),
Field::new("LO_PARTKEY", DataType::Int64, false),
Field::new("LO_SUPPKEY", DataType::Int64, false),
Field::new("LO_ORDERDATE", DataType::Int64, false),
Field::new("LO_ORDERPRIORITY", DataType::Utf8, false),
Field::new("LO_SHIPPRIOTITY", DataType::Int64, false),
Field::new("LO_QUANTITY", DataType::Int64, false),
Field::new("LO_EXTENDEDPRICE", DataType::Int64, false),
Field::new("LO_ORDTOTALPRICE", DataType::Int64, false),
Field::new("LO_DISCOUNT", DataType::Int64, false),
Field::new("LO_REVENUE", DataType::Int64, false),
Field::new("LO_SUPPLYCOST", DataType::Int64, false),
Field::new("LO_TAX", DataType::Int64, false),
Field::new("LO_COMMITDATE", DataType::Int64, false),
Field::new("LO_SHIPMODE", DataType::Utf8, false),
Field::new("C_NAME", DataType::Utf8, false),
Field::new("C_ADDRESS", DataType::Utf8, false),
Field::new("C_CITY", DataType::Utf8, false),
Field::new("C_NATION", DataType::Utf8, false),
Field::new("C_REGION", DataType::Utf8, false),
Field::new("C_PHONE", DataType::Utf8, false),
Field::new("C_MKTSEGMENT", DataType::Utf8, false),
Field::new("S_NAME", DataType::Utf8, false),
Field::new("S_ADDRESS", DataType::Utf8, false),
Field::new("S_CITY", DataType::Utf8, false),
Field::new("S_NATION", DataType::Utf8, false),
Field::new("S_REGION", DataType::Utf8, false),
Field::new("S_PHONE", DataType::Utf8, false),
Field::new("P_NAME", DataType::Utf8, false),
Field::new("P_MFGR", DataType::Utf8, false),
Field::new("P_CATEGORY", DataType::Utf8, false),
Field::new("P_BRAND", DataType::Utf8, false),
Field::new("P_COLOR", DataType::Utf8, false),
Field::new("P_TYPE", DataType::Utf8, false),
Field::new("P_SIZE", DataType::Int64, false),
Field::new("P_CONTAINER", DataType::Utf8, false),
]);
let project_schema = Schema::new(vec![
Field::new("LO_EXTENDEDPRICE", DataType::Int64, false),
Field::new("LO_DISCOUNT", DataType::Int64, false),
]);
let dt = Local::now();
let table_provider = Arc::new(ParquetTable::try_new_with_schema(
parquet_path,
lineorder_flat_schema,
32,
false,
)?);
let exec = table_provider
.scan(&Option::Some(vec![9,11]), 8192, &[], None)
.await?;
let partition_count = exec.output_partitioning().partition_count();
let tasks = (0..partition_count)
.map(|part_i| {
let exec = exec.clone();
tokio::spawn(async move {
let stream = exec.execute(part_i).await?;
common::collect(stream).await
})
})
.collect::<Vec<_>>();
let mut data: Vec<Vec<RecordBatch>> = Vec::new();
for task in tasks {
let result = task.await.expect("MemTable::load could not join
task")?;
data.push(result);
}
let memtable =
MemTable::try_new(SchemaRef::new(project_schema).clone(), data).unwrap();
println!(
"prepare memtable usage millis: {}",
Local::now().timestamp_millis() - dt.timestamp_millis()
);
let execution_config = ExecutionConfig::new();
let mut ctx = ExecutionContext::with_config(execution_config);
ctx.register_table("lineorder_flat", Arc::new(memtable))?;
let dt = Local::now();
let sql = "SELECT sum(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
FROM lineorder_flat";
let df = ctx.sql(sql)?;
let results: Vec<RecordBatch> = df.collect().await?;
print_batches(&results)?;
println!(
"sql execution usage millis: {}",
Local::now().timestamp_millis() - dt.timestamp_millis()
);
Ok(())
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]