alamb commented on code in PR #6347:
URL: https://github.com/apache/arrow-datafusion/pull/6347#discussion_r1192807737
##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -529,261 +368,4 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_insert_into() -> Result<()> {
- // Create session context
- let config = SessionConfig::new().with_target_partitions(8);
- let ctx = SessionContext::with_config(config);
- let testdata = test_util::arrow_test_data();
- let schema = test_util::aggr_test_schema();
- ctx.register_csv(
- "aggregate_test_100",
- &format!("{testdata}/csv/aggregate_test_100.csv"),
- CsvReadOptions::new().schema(&schema),
- )
- .await?;
- ctx.sql(
- "CREATE TABLE table_without_values(field1 BIGINT NULL, field2
BIGINT NULL)",
- )
- .await?;
-
- let sql = "INSERT INTO table_without_values SELECT
- SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1
PRECEDING AND 1 FOLLOWING),
- COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1
PRECEDING AND 1 FOLLOWING)
- FROM aggregate_test_100
- ORDER by c1
- ";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted =
displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "MemoryWriteExec: partitions=1, input_partition=1",
- " ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION
BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]",
- " SortPreservingMergeExec: [c1@2 ASC NULLS LAST]",
- " ProjectionExec: expr=[SUM(aggregate_test_100.c4)
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4),
COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING@4 as COUNT(UInt8(1)), c1@0 as c1]",
- " BoundedWindowAggExec:
wdw=[SUM(aggregate_test_100.c4): Ok(Field { name:
\"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) },
COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound:
Following(UInt64(1)) }], mode=[Sorted]",
- " SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS
LAST]",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([Column {
name: \"c1\", index: 0 }], 8), input_partitions=8",
- " RepartitionExec:
partitioning=RoundRobinBatch(8), input_partitions=1",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- Ok(())
- }
-
- #[tokio::test]
- async fn test_insert_into_as_select_multi_partitioned() -> Result<()> {
- // Create session context
- let config = SessionConfig::new().with_target_partitions(8);
- let ctx = SessionContext::with_config(config);
- let testdata = test_util::arrow_test_data();
- let schema = test_util::aggr_test_schema();
- ctx.register_csv(
- "aggregate_test_100",
- &format!("{testdata}/csv/aggregate_test_100.csv"),
- CsvReadOptions::new().schema(&schema),
- )
- .await?;
- ctx.sql(
- "CREATE TABLE table_without_values(field1 BIGINT NULL, field2
BIGINT NULL)",
- )
- .await?;
-
- let sql = "INSERT INTO table_without_values SELECT
- SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1
PRECEDING AND 1 FOLLOWING) as a1,
- COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1
PRECEDING AND 1 FOLLOWING) as a2
- FROM aggregate_test_100";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted =
displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "MemoryWriteExec: partitions=1, input_partition=1",
- " CoalescePartitionsExec",
- " ProjectionExec: expr=[SUM(aggregate_test_100.c4)
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(UInt8(1))
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4):
Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable:
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1))
}, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound:
Following(UInt64(1)) }], mode=[Sorted]",
- " SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS
LAST]",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([Column {
name: \"c1\", index: 0 }], 8), input_partitions=8",
- " RepartitionExec:
partitioning=RoundRobinBatch(8), input_partitions=1",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- Ok(())
- }
-
- // TODO: The generated plan is suboptimal since SortExec is in global
state.
- #[tokio::test]
- async fn test_insert_into_as_select_single_partition() -> Result<()> {
- // Create session context
- let config = SessionConfig::new().with_target_partitions(8);
- let ctx = SessionContext::with_config(config);
- let testdata = test_util::arrow_test_data();
- let schema = test_util::aggr_test_schema();
- ctx.register_csv(
- "aggregate_test_100",
- &format!("{testdata}/csv/aggregate_test_100.csv"),
- CsvReadOptions::new().schema(&schema),
- )
- .await?;
- ctx.sql("CREATE TABLE table_without_values AS SELECT
- SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1
PRECEDING AND 1 FOLLOWING) as a1,
- COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1
PRECEDING AND 1 FOLLOWING) as a2
- FROM aggregate_test_100")
- .await?;
-
- let sql = "INSERT INTO table_without_values SELECT
- SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1
PRECEDING AND 1 FOLLOWING) as a1,
- COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1
PRECEDING AND 1 FOLLOWING) as a2
- FROM aggregate_test_100
- ORDER BY c1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted =
displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "MemoryWriteExec: partitions=8, input_partition=8",
- " RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " ProjectionExec: expr=[a1@0 as a1, a2@1 as a2]",
- " SortPreservingMergeExec: [c1@2 ASC NULLS LAST]",
- " ProjectionExec: expr=[SUM(aggregate_test_100.c4)
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(UInt8(1))
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1]",
- " BoundedWindowAggExec:
wdw=[SUM(aggregate_test_100.c4): Ok(Field { name:
\"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) },
COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame:
WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound:
Following(UInt64(1)) }], mode=[Sorted]",
- " SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC
NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([Column {
name: \"c1\", index: 0 }], 8), input_partitions=8",
- " RepartitionExec:
partitioning=RoundRobinBatch(8), input_partitions=1",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- Ok(())
- }
-
- // DummyPartition is a simple implementation of the PartitionStream trait.
- // It produces a stream of record batches with a fixed schema and the same
content.
- struct DummyPartition {
- schema: SchemaRef,
- batch: RecordBatch,
- num_batches: usize,
- }
-
- impl PartitionStream for DummyPartition {
- // Return a reference to the schema of this partition.
- fn schema(&self) -> &SchemaRef {
- &self.schema
- }
-
- // Execute the partition stream, producing a stream of record batches.
- fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream
{
- let batches = itertools::repeat_n(self.batch.clone(),
self.num_batches);
- Box::pin(RecordBatchStreamAdapter::new(
- self.schema.clone(),
- futures::stream::iter(batches).map(Ok),
- ))
- }
- }
-
- // Test the less-lock mode by inserting a large number of batches into a
table.
- #[tokio::test]
- async fn test_one_to_one_mode() -> Result<()> {
Review Comment:
I am not quite sure how to port these tests yet -- I am thinking about it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]