alamb commented on code in PR #4743:
URL: https://github.com/apache/arrow-datafusion/pull/4743#discussion_r1057694031
##########
datafusion/core/tests/parquet/filter_pushdown.rs:
##########
@@ -42,60 +42,64 @@ use tempfile::TempDir;
use test_utils::AccessLogGenerator;
/// how many rows of generated data to write to our parquet file (arbitrary)
-const NUM_ROWS: usize = 53819;
-const ROW_LIMIT: usize = 4096;
-
-#[cfg(test)]
-#[ctor::ctor]
-fn init() {
- // enable logging so RUST_LOG works
- let _ = env_logger::try_init();
-}
-
-#[cfg(not(target_family = "windows"))]
-// Use multi-threaded executor as this test consumes CPU
-#[tokio::test(flavor = "multi_thread")]
-async fn single_file() {
- // Only create the parquet file once as it is fairly large
-
- let tempdir = TempDir::new().unwrap();
+const NUM_ROWS: usize = 4096;
+fn generate_file(tempdir: &TempDir, props: WriterProperties) ->
TestParquetFile {
+ // Tune down the generator for smaller files
let generator = AccessLogGenerator::new()
.with_row_limit(NUM_ROWS)
- .with_max_batch_size(ROW_LIMIT);
+ .with_pods_per_host(1..4)
+ .with_containers_per_pod(1..2)
+ .with_entries_per_container(128..256);
- // default properties
- let props = WriterProperties::builder().build();
let file = tempdir.path().join("data.parquet");
let start = Instant::now();
println!("Writing test data to {:?}", file);
- let test_parquet_file =
- Arc::new(TestParquetFile::try_new(file, props, generator).unwrap());
+ let test_parquet_file = TestParquetFile::try_new(file, props,
generator).unwrap();
println!(
"Completed generating test data in {:?}",
Instant::now() - start
);
+ test_parquet_file
+}
+
+#[cfg(test)]
+#[ctor::ctor]
+fn init() {
+ // enable logging so RUST_LOG works
+ let _ = env_logger::try_init();
+}
+
+#[cfg(not(target_family = "windows"))]
+#[tokio::test]
+async fn single_file() {
+ // Only create the parquet file once as it is fairly large
- let mut set = tokio::task::JoinSet::new();
Review Comment:
cc @waynexia
##########
datafusion/core/tests/parquet/filter_pushdown.rs:
##########
@@ -107,109 +110,108 @@ async fn single_file() {
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(1731);
- set.spawn(async move { case.run().await });
+ .with_expected_rows(135);
+ case.run().await;
- let case = TestCase::new(test_parquet_file.clone())
+ let case = TestCase::new(&test_parquet_file)
.with_name("everything")
// filter filters everything (no row has this status)
// response_status = 429
.with_filter(col("response_status").eq(lit(429_u16)))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(0);
- set.spawn(async move { case.run().await });
+ case.run().await;
- let case = TestCase::new(test_parquet_file.clone())
+ let case = TestCase::new(&test_parquet_file)
.with_name("nothing")
// No rows are filtered out -- all are returned
// response_status > 0
.with_filter(col("response_status").gt(lit(0_u16)))
.with_pushdown_expected(PushdownExpected::None)
.with_expected_rows(NUM_ROWS);
- set.spawn(async move { case.run().await });
+ case.run().await;
- let case = TestCase::new(test_parquet_file.clone())
+ let case = TestCase::new(&test_parquet_file)
.with_name("dict_selective")
// container = 'backend_container_0'
.with_filter(col("container").eq(lit("backend_container_0")))
.with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(15911);
- set.spawn(async move { case.run().await });
+ .with_expected_rows(802);
+ case.run().await;
- let case = TestCase::new(test_parquet_file.clone())
+ let case = TestCase::new(&test_parquet_file)
.with_name("not eq")
// container != 'backend_container_0'
.with_filter(col("container").not_eq(lit("backend_container_0")))
.with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(37908);
- set.spawn(async move { case.run().await });
+ .with_expected_rows(3294);
+ case.run().await;
- let case = TestCase::new(test_parquet_file.clone())
+ let case = TestCase::new(&test_parquet_file)
.with_name("dict_conjunction")
// container == 'backend_container_0' AND
- // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg'
+ // pod = 'cvcjfhwtjttxhiugepoojxrplihywu'
.with_filter(
conjunction([
col("container").eq(lit("backend_container_0")),
- col("pod").eq(lit("aqcathnxqsphdhgjtgvxsfyiwbmhlmg")),
+ col("pod").eq(lit("cvcjfhwtjttxhiugepoojxrplihywu")),
])
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(3052);
- set.spawn(async move { case.run().await });
+ .with_expected_rows(134);
+ case.run().await;
- let case = TestCase::new(test_parquet_file.clone())
+ let case = TestCase::new(&test_parquet_file)
.with_name("dict_very_selective")
// request_bytes > 2B AND
// container == 'backend_container_0' AND
- // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg'
+ // pod = 'cvcjfhwtjttxhiugepoojxrplihywu'
.with_filter(
conjunction([
col("request_bytes").gt(lit(2000000000)),
col("container").eq(lit("backend_container_0")),
- col("pod").eq(lit("aqcathnxqsphdhgjtgvxsfyiwbmhlmg")),
+ col("pod").eq(lit("cvcjfhwtjttxhiugepoojxrplihywu")),
])
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
- .with_expected_rows(88);
- set.spawn(async move { case.run().await });
+ .with_expected_rows(2);
+ case.run().await;
- let case = TestCase::new(test_parquet_file.clone())
+ let case = TestCase::new(&test_parquet_file)
.with_name("dict_very_selective2")
- // picks only 2 rows
// client_addr = '204.47.29.82' AND
// container == 'backend_container_0' AND
- // pod = 'aqcathnxqsphdhgjtgvxsfyiwbmhlmg'
+ // pod = 'cvcjfhwtjttxhiugepoojxrplihywu'
.with_filter(
conjunction(vec![
- col("request_bytes").gt(lit(2000000000)),
+ col("client_addr").eq(lit("58.242.143.99")),
Review Comment:
Is the case of a pruning on a non equality predicate on a non dictionary
encoded column covered elsewhere?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]