This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e4abf5177 test: Port (last) `repartition.rs` query to sqllogictest 
(#8936)
6e4abf5177 is described below

commit 6e4abf517750d7dde2a5d527e76d361fdcd16cd0
Author: Dejan Simic <[email protected]>
AuthorDate: Thu Jan 25 22:22:04 2024 +0100

    test: Port (last) `repartition.rs` query to sqllogictest (#8936)
    
    * Migrate last repartition query
    
    * Add reference to issue
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Fix missing statement
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/tests/sql/mod.rs                   |  1 -
 datafusion/core/tests/sql/repartition.rs           | 59 ----------------------
 datafusion/sqllogictest/test_files/repartition.slt | 56 ++++++++++++++++++++
 3 files changed, 56 insertions(+), 60 deletions(-)

diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index 2389c86306..246191e48a 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -71,7 +71,6 @@ pub mod create_drop;
 pub mod explain_analyze;
 pub mod expr;
 pub mod joins;
-pub mod repartition;
 pub mod select;
 mod sql_api;
 
diff --git a/datafusion/core/tests/sql/repartition.rs 
b/datafusion/core/tests/sql/repartition.rs
deleted file mode 100644
index 332f18e941..0000000000
--- a/datafusion/core/tests/sql/repartition.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use arrow::array::UInt32Array;
-use arrow::datatypes::{DataType, Field, Schema};
-use arrow::record_batch::RecordBatch;
-use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use datafusion::physical_plan::repartition::RepartitionExec;
-use datafusion::physical_plan::{ExecutionPlan, Partitioning};
-use datafusion::prelude::{SessionConfig, SessionContext};
-use datafusion::test_util::UnboundedExec;
-use datafusion_common::Result;
-use datafusion_physical_expr::expressions::Column;
-use datafusion_physical_expr::PhysicalExpr;
-use futures::StreamExt;
-use std::sync::Arc;
-
-/// See <https://github.com/apache/arrow-datafusion/issues/5278>
-#[tokio::test]
-async fn unbounded_repartition() -> Result<()> {
-    let config = SessionConfig::new();
-    let ctx = SessionContext::new_with_config(config);
-    let task = ctx.task_ctx();
-    let schema = Arc::new(Schema::new(vec![Field::new("a2", DataType::UInt32, 
false)]));
-    let batch = RecordBatch::try_new(
-        Arc::clone(&schema),
-        vec![Arc::new(UInt32Array::from(vec![1]))],
-    )?;
-    let input = Arc::new(UnboundedExec::new(None, batch.clone(), 1));
-    let on: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("a2", 0))];
-    let plan = Arc::new(RepartitionExec::try_new(input, Partitioning::Hash(on, 
3))?);
-    let plan = Arc::new(CoalescePartitionsExec::new(plan.clone()));
-    let mut stream = plan.execute(0, task)?;
-
-    // Note: `tokio::time::timeout` does NOT help here because in the 
mentioned issue, the whole runtime is blocked by a
-    // CPU-spinning thread. Using a multithread runtime with multiple threads 
is NOT a solution since this would not
-    // trigger the bug (the bug is not specific to a single-thread RT though, 
it's just the only way to trigger it reliably).
-    let batch_actual = stream
-        .next()
-        .await
-        .expect("not terminated")
-        .expect("no error in stream");
-    assert_eq!(batch_actual, batch);
-    Ok(())
-}
diff --git a/datafusion/sqllogictest/test_files/repartition.slt 
b/datafusion/sqllogictest/test_files/repartition.slt
index 9829299f43..7c141adf82 100644
--- a/datafusion/sqllogictest/test_files/repartition.slt
+++ b/datafusion/sqllogictest/test_files/repartition.slt
@@ -71,3 +71,59 @@ AggregateExec: mode=FinalPartitioned, gby=[column1@0 as 
column1], aggr=[SUM(parq
 # Cleanup
 statement ok
 DROP TABLE parquet_table;
+
+
+
+# Unbounded repartition
+# See https://github.com/apache/arrow-datafusion/issues/5278
+# Set up unbounded table and run a query - the query plan should display a 
`RepartitionExec`
+# and a `CoalescePartitionsExec`
+statement ok
+CREATE UNBOUNDED EXTERNAL TABLE sink_table (
+        c1  VARCHAR NOT NULL,
+        c2  TINYINT NOT NULL,
+        c3  SMALLINT NOT NULL,
+        c4  SMALLINT NOT NULL,
+        c5  INTEGER NOT NULL,
+        c6  BIGINT NOT NULL,
+        c7  SMALLINT NOT NULL,
+        c8  INT NOT NULL,
+        c9  INT UNSIGNED NOT NULL,
+        c10 BIGINT UNSIGNED NOT NULL,
+        c11 FLOAT NOT NULL,
+        c12 DOUBLE NOT NULL,
+        c13 VARCHAR NOT NULL
+    )
+STORED AS CSV
+WITH HEADER ROW
+LOCATION '../../testing/data/csv/aggregate_test_100.csv';
+
+query TII
+SELECT c1, c2, c3 FROM sink_table WHERE c3 > 0 LIMIT 5;
+----
+c 2 1
+b 1 29
+e 3 104
+a 3 13
+d 1 38
+
+statement ok
+set datafusion.execution.target_partitions = 3;
+
+statement ok
+set datafusion.optimizer.enable_round_robin_repartition = true;
+
+query TT
+EXPLAIN SELECT c1, c2, c3 FROM sink_table WHERE c3 > 0 LIMIT 5;
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Filter: sink_table.c3 > Int16(0)
+----TableScan: sink_table projection=[c1, c2, c3]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--CoalescePartitionsExec
+----CoalesceBatchesExec: target_batch_size=8192
+------FilterExec: c3@2 > 0
+--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
+----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true

Reply via email to