echai58 commented on issue #9928:
URL:
https://github.com/apache/arrow-datafusion/issues/9928#issuecomment-2040356047
@alamb @mustafasrepo here's the output when running with `RUST_LOG=DEBUG`
for the physical plan. This is a delta table with columns `ticker,
valuation_date, value, _month`, merging on `source.ticker = target.ticker`
(simplified the merge predicate for simplicity of the plan)
```
[2024-04-05T18:00:00Z DEBUG datafusion::physical_planner] Input physical
plan:
ProjectionExec: expr=[__delta_rs_c_ticker@0 as ticker,
__delta_rs_c_valuation_date@1 as valuation_date, __delta_rs_c_value@2 as value,
__delta_rs_c__month@3 as _month]
FilterExec: __delta_rs_delete@4 IS NOT DISTINCT FROM false
ProjectionExec: expr=[__delta_rs_c_ticker@12 as __delta_rs_c_ticker,
__delta_rs_c_valuation_date@13 as __delta_rs_c_valuation_date,
__delta_rs_c_value@14 as __delta_rs_c_value, __delta_rs_c__month@15 as
__delta_rs_c__month, __delta_rs_delete@16 as __delta_rs_delete]
MetricObserverExec id=merge_output_count
MergeBarrier
ProjectionExec: expr=[ticker@0 as ticker, valuation_date@1 as
valuation_date, value@2 as value, _month@3 as _month, __delta_rs_source@4 as
__delta_rs_source, ticker@5 as ticker, valuation_date@6 as valuation_date,
value@7 as value, _month@8 as _month, __delta_rs_path@9 as __delta_rs_path,
__delta_rs_target@10 as __delta_rs_target, __delta_rs_operation@11 as
__delta_rs_operation, CASE __delta_rs_operation@11 WHEN 0 THEN ticker@0 WHEN 1
THEN ticker@0 WHEN 2 THEN CAST(ticker@5 AS LargeUtf8) WHEN 3 THEN CAST(ticker@5
AS LargeUtf8) WHEN 4 THEN CAST(ticker@5 AS LargeUtf8) END as
__delta_rs_c_ticker, CASE __delta_rs_operation@11 WHEN 0 THEN valuation_date@1
WHEN 1 THEN valuation_date@1 WHEN 2 THEN valuation_date@6 WHEN 3 THEN
valuation_date@6 WHEN 4 THEN valuation_date@6 END as
__delta_rs_c_valuation_date, CASE __delta_rs_operation@11 WHEN 0 THEN value@2
WHEN 1 THEN value@2 WHEN 2 THEN value@7 WHEN 3 THEN value@7 WHEN 4 THEN value@7
END as __delta_rs_c_value, CASE __delta_r
s_operation@11 WHEN 0 THEN _month@3 WHEN 1 THEN _month@3 WHEN 2 THEN
CAST(_month@8 AS LargeUtf8) WHEN 3 THEN CAST(_month@8 AS LargeUtf8) WHEN 4 THEN
CAST(_month@8 AS LargeUtf8) END as __delta_rs_c__month, CASE
__delta_rs_operation@11 WHEN 0 THEN false WHEN 1 THEN false WHEN 2 THEN false
WHEN 3 THEN true WHEN 4 THEN false ELSE false END as __delta_rs_delete, CASE
__delta_rs_operation@11 WHEN 0 THEN false WHEN 1 THEN NULL WHEN 2 THEN false
WHEN 3 THEN false WHEN 4 THEN false ELSE false END as __delta_rs_target_insert,
CASE __delta_rs_operation@11 WHEN 0 THEN NULL WHEN 1 THEN false WHEN 2 THEN
false WHEN 3 THEN false WHEN 4 THEN false ELSE false END as
__delta_rs_target_update, CASE __delta_rs_operation@11 WHEN 0 THEN false WHEN 1
THEN false WHEN 2 THEN false WHEN 3 THEN false WHEN 4 THEN false ELSE false END
as __delta_rs_target_delete, CASE __delta_rs_operation@11 WHEN 0 THEN false
WHEN 1 THEN false WHEN 2 THEN NULL WHEN 3 THEN false WHEN 4 THEN NULL ELSE
false END as __delta_rs_targ
et_copy]
ProjectionExec: expr=[ticker@0 as ticker, valuation_date@1
as valuation_date, value@2 as value, _month@3 as _month, __delta_rs_source@4 as
__delta_rs_source, ticker@5 as ticker, valuation_date@6 as valuation_date,
value@7 as value, _month@8 as _month, __delta_rs_path@9 as __delta_rs_path,
__delta_rs_target@10 as __delta_rs_target, CASE WHEN (__delta_rs_source@4 IS
NOT DISTINCT FROM true) AND (__delta_rs_target@10 IS NOT DISTINCT FROM true)
THEN 0 WHEN (__delta_rs_source@4 IS NOT DISTINCT FROM true) AND
__delta_rs_target@10 IS NULL THEN 1 WHEN (__delta_rs_source@4 IS NOT DISTINCT
FROM true) AND (__delta_rs_target@10 IS NOT DISTINCT FROM true) THEN 2 WHEN
(__delta_rs_source@4 IS NOT DISTINCT FROM true) AND __delta_rs_target@10 IS
NULL THEN 3 WHEN __delta_rs_source@4 IS NULL AND __delta_rs_target@10 IS NOT
DISTINCT FROM true THEN 4 END as __delta_rs_operation]
ProjectionExec: expr=[ticker@0 as ticker, valuation_date@1
as valuation_date, value@2 as value, _month@3 as _month, __delta_rs_source@4 as
__delta_rs_source, ticker@5 as ticker, valuation_date@6 as valuation_date,
value@7 as value, _month@8 as _month, __delta_rs_path@9 as __delta_rs_path,
__delta_rs_target@10 as __delta_rs_target]
HashJoinExec: mode=CollectLeft, join_type=Full,
on=[(ticker@0, CAST(t.ticker AS LargeUtf8)@6)]
ProjectionExec: expr=[ticker@0 as ticker,
valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month, true
as __delta_rs_source]
MetricObserverExec id=merge_source_count
ProjectionExec: expr=[ticker@0 as ticker,
valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month]
MemoryExec: partitions=1, partition_sizes=[1]
ProjectionExec: expr=[ticker@0 as ticker,
valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month,
__delta_rs_path@4 as __delta_rs_path, __delta_rs_target@5 as __delta_rs_target,
CAST(ticker@0 AS LargeUtf8) as CAST(t.ticker AS LargeUtf8)]
ProjectionExec: expr=[ticker@0 as ticker,
valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month,
__delta_rs_path@4 as __delta_rs_path, true as __delta_rs_target]
MetricObserverExec id=merge_target_count
DeltaScan
ParquetExec: file_groups={146 groups:
[[_month=2018-08/part-00001-2196a828-b7ac-4406-8b21-63edd7072c0e-c000.snappy.parquet],
[_month=2011-10/part-00001-08fb1ac4-3fc0-43e1-af5f-14765704e60c-c000.snappy.parquet],
[_month=2018-10/part-00001-50d537b7-b59b-415e-a13d-176526552b52-c000.snappy.parquet],
[_month=2011-05/part-00001-2bcb1fa9-c396-4c38-901b-d73cf5443f28-c000.snappy.parquet],
[_month=2016-03/part-00001-5a4f5659-43ee-4c90-931c-0dd0d15d9e4e-c000.snappy.parquet],
...]}, projection=[ticker, valuation_date, value, _month, __delta_rs_path]
[2024-04-05T18:00:00Z DEBUG datafusion::physical_planner] Optimized physical
plan:
ProjectionExec: expr=[__delta_rs_c_ticker@0 as ticker,
__delta_rs_c_valuation_date@1 as valuation_date, __delta_rs_c_value@2 as value,
__delta_rs_c__month@3 as _month]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: __delta_rs_delete@4 IS NOT DISTINCT FROM false
ProjectionExec: expr=[__delta_rs_c_ticker@12 as
__delta_rs_c_ticker, __delta_rs_c_valuation_date@13 as
__delta_rs_c_valuation_date, __delta_rs_c_value@14 as __delta_rs_c_value,
__delta_rs_c__month@15 as __delta_rs_c__month, __delta_rs_delete@16 as
__delta_rs_delete]
MetricObserverExec id=merge_output_count
MergeBarrier
ProjectionExec: expr=[ticker@0 as ticker, valuation_date@1
as valuation_date, value@2 as value, _month@3 as _month, __delta_rs_source@4 as
__delta_rs_source, ticker@5 as ticker, valuation_date@6 as valuation_date,
value@7 as value, _month@8 as _month, __delta_rs_path@9 as __delta_rs_path,
__delta_rs_target@10 as __delta_rs_target, __delta_rs_operation@11 as
__delta_rs_operation, CASE __delta_rs_operation@11 WHEN 0 THEN ticker@0 WHEN 1
THEN ticker@0 WHEN 2 THEN CAST(ticker@5 AS LargeUtf8) WHEN 3 THEN CAST(ticker@5
AS LargeUtf8) WHEN 4 THEN CAST(ticker@5 AS LargeUtf8) END as
__delta_rs_c_ticker, CASE __delta_rs_operation@11 WHEN 0 THEN valuation_date@1
WHEN 1 THEN valuation_date@1 WHEN 2 THEN valuation_date@6 WHEN 3 THEN
valuation_date@6 WHEN 4 THEN valuation_date@6 END as
__delta_rs_c_valuation_date, CASE __delta_rs_operation@11 WHEN 0 THEN value@2
WHEN 1 THEN value@2 WHEN 2 THEN value@7 WHEN 3 THEN value@7 WHEN 4 THEN value@7
END as __delta_rs_c_value, CASE __delta
_rs_operation@11 WHEN 0 THEN _month@3 WHEN 1 THEN _month@3 WHEN 2 THEN
CAST(_month@8 AS LargeUtf8) WHEN 3 THEN CAST(_month@8 AS LargeUtf8) WHEN 4 THEN
CAST(_month@8 AS LargeUtf8) END as __delta_rs_c__month, CASE
__delta_rs_operation@11 WHEN 0 THEN false WHEN 1 THEN false WHEN 2 THEN false
WHEN 3 THEN true WHEN 4 THEN false ELSE false END as __delta_rs_delete, CASE
__delta_rs_operation@11 WHEN 0 THEN false WHEN 1 THEN NULL WHEN 2 THEN false
WHEN 3 THEN false WHEN 4 THEN false ELSE false END as __delta_rs_target_insert,
CASE __delta_rs_operation@11 WHEN 0 THEN NULL WHEN 1 THEN false WHEN 2 THEN
false WHEN 3 THEN false WHEN 4 THEN false ELSE false END as
__delta_rs_target_update, CASE __delta_rs_operation@11 WHEN 0 THEN false WHEN 1
THEN false WHEN 2 THEN false WHEN 3 THEN false WHEN 4 THEN false ELSE false END
as __delta_rs_target_delete, CASE __delta_rs_operation@11 WHEN 0 THEN false
WHEN 1 THEN false WHEN 2 THEN NULL WHEN 3 THEN false WHEN 4 THEN NULL ELSE
false END as __delta_rs_ta
rget_copy]
ProjectionExec: expr=[ticker@7 as ticker, valuation_date@8
as valuation_date, value@9 as value, _month@10 as _month, __delta_rs_source@11
as __delta_rs_source, ticker@0 as ticker, valuation_date@1 as valuation_date,
value@2 as value, _month@3 as _month, __delta_rs_path@4 as __delta_rs_path,
__delta_rs_target@5 as __delta_rs_target, CASE WHEN (__delta_rs_source@11 IS
NOT DISTINCT FROM true) AND (__delta_rs_target@5 IS NOT DISTINCT FROM true)
THEN 0 WHEN (__delta_rs_source@11 IS NOT DISTINCT FROM true) AND
__delta_rs_target@5 IS NULL THEN 1 WHEN (__delta_rs_source@11 IS NOT DISTINCT
FROM true) AND (__delta_rs_target@5 IS NOT DISTINCT FROM true) THEN 2 WHEN
(__delta_rs_source@11 IS NOT DISTINCT FROM true) AND __delta_rs_target@5 IS
NULL THEN 3 WHEN __delta_rs_source@11 IS NULL AND __delta_rs_target@5 IS NOT
DISTINCT FROM true THEN 4 END as __delta_rs_operation]
CoalesceBatchesExec: target_batch_size=8192
HashJoinExec: mode=Partitioned, join_type=Full,
on=[(CAST(t.ticker AS LargeUtf8)@6, ticker@0)]
ProjectionExec: expr=[ticker@0 as ticker,
valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month,
__delta_rs_path@4 as __delta_rs_path, true as __delta_rs_target, CAST(ticker@0
AS LargeUtf8) as CAST(t.ticker AS LargeUtf8)]
MetricObserverExec id=merge_target_count
ParquetExec: file_groups={146 groups:
[[_month=2018-08/part-00001-2196a828-b7ac-4406-8b21-63edd7072c0e-c000.snappy.parquet],
[_month=2011-10/part-00001-08fb1ac4-3fc0-43e1-af5f-14765704e60c-c000.snappy.parquet],
[_month=2018-10/part-00001-50d537b7-b59b-415e-a13d-176526552b52-c000.snappy.parquet],
[_month=2011-05/part-00001-2bcb1fa9-c396-4c38-901b-d73cf5443f28-c000.snappy.parquet],
[_month=2016-03/part-00001-5a4f5659-43ee-4c90-931c-0dd0d15d9e4e-c000.snappy.parquet],
...]}, projection=[ticker, valuation_date, value, _month, __delta_rs_path]
ProjectionExec: expr=[ticker@0 as ticker,
valuation_date@1 as valuation_date, value@2 as value, _month@3 as _month, true
as __delta_rs_source]
MetricObserverExec id=merge_source_count
MemoryExec: partitions=1, partition_sizes=[1]
```
Let me know if there is any other log output that would be helpful, thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]