mgmarino opened a new issue, #9678:
URL: https://github.com/apache/iceberg/issues/9678

   ### Query engine
   
   AWS Athena, verified on Spark/EMR as well (3.4.1, Iceberg 1.4.3)
   
   ### Question
   
   We are currently looking to migrate several of our old Hive tables to 
Iceberg. Our tables were partitioned like:
   
   `year=../month=../day=..`
   
   and we typically did things like the following:
   
   ```
   WHERE
     year || month || day BETWEEN '20230104' AND '20230110'
   ```
   
   to select particular partitions in both Spark and Trino/Athena. It seems 
this particular type of processing on the `WHERE` predicate does not work as 
expected (i.e. does not partition prune) in Iceberg and I wanted to know if 
this was a known limitation? (Apologies, I couldn't find this anywhere in the 
docs!)
   
   An example from Athena/Trino:
   
   ```sql
   EXPLAIN ANALYZE
   SELECT 
   COUNT(*) as num_rows
   FROM prod.table_iceberg
   WHERE year = '2024'
        AND month = '1'
   ```
   
   outputs:
   
   ```
   Query Plan
   Queued: 544.36us, Analysis: 502.40ms, Planning: 190.60ms, Execution: 8.55s
   Fragment 1 [SINGLE]
       CPU: 411.84ms, Scheduled: 1.43s, Blocked 33.56s (Input: 26.45s, Output: 
0.00ns), Input: 13972 rows (122.80kB), Data Scanned: 0B; per task: avg.: 
13972.00 std.dev.: 0.00, Output: 1 row (9B)
       Output layout: [count]
       Output partitioning: SINGLE []
       Aggregate[type = FINAL]
       │   Layout: [count:bigint]
       │   Estimates: {rows: 1 (9B), cpu: 9, memory: 9B, network: 0B}
       │   CPU: 67.00ms (0.01%), Scheduled: 134.00ms (0.00%), Blocked: 0.00ns 
(0.00%), Output: 1 row (9B)
       │   Input avg.: 13972.00 rows, Input std.dev.: 0.00%
       │   count := count("count_0")
       └─ LocalExchange[partitioning = SINGLE]
          │   Layout: [count_0:bigint]
          │   Estimates: {rows: 1 (9B), cpu: 0, memory: 0B, network: 0B}
          │   CPU: 114.00ms (0.01%), Scheduled: 265.00ms (0.01%), Blocked: 
7.15s (21.28%), Output: 13972 rows (122.80kB)
          │   Input avg.: 3493.00 rows, Input std.dev.: 74.24%
          └─ RemoteSource[sourceFragmentIds = [2]]
                 Layout: [count_0:bigint]
                 CPU: 187.00ms (0.02%), Scheduled: 956.00ms (0.03%), Blocked: 
26.45s (78.72%), Output: 13972 rows (122.80kB)
                 Input avg.: 3493.00 rows, Input std.dev.: 74.24%
   
   Fragment 2 [SOURCE]
       CPU: 12.95m, Scheduled: 47.38m, Blocked 0.00ns (Input: 0.00ns, Output: 
0.00ns), Input: 62896440400 rows (0B), Data Scanned: 0B; per task: avg.: 
2096548013.33 std.dev.: 877588981.69, Output: 13972 rows (122.80kB)
       Output layout: [count_0]
       Output partitioning: SINGLE []
       Aggregate[type = PARTIAL]
       │   Layout: [count_0:bigint]
       │   Estimates: {rows: 1 (9B), cpu: 0, memory: 9B, network: 0B}
       │   CPU: 5.30m (40.94%), Scheduled: 15.36m (32.41%), Blocked: 0.00ns 
(0.00%), Output: 13972 rows (122.80kB)
       │   Input avg.: 4501606.10 rows, Input std.dev.: 103.44%
       │   count_0 := count(*)
       └─ TableScan[table = 
awsdatacatalog$iceberg-aws:prod.table_iceberg$data@4799610969623733758 
constraint on [year, month]]
              Layout: []
              Estimates: {rows: ? (0B), cpu: 0, memory: 0B, network: 0B}
              CPU: 7.64m (59.01%), Scheduled: 32.00m (67.54%), Blocked: 0.00ns 
(0.00%), Output: 62896440400 rows (0B)
              Input avg.: 4501606.10 rows, Input std.dev.: 103.44%
              181:year:varchar
                  :: [[2024]]
              182:month:varchar
                  :: [[1]]
              Input: 62896440400 rows (0B), Physical input time: 527230.00ms
   ```
   where the output seems to indicate to me that this is a metadata-only read 
(0B read). It is fairly fast (~9 s)
   
   The same query using concatenation looks like:
   
   ```sql
   EXPLAIN ANALYZE
   SELECT COUNT(*) as num_rows
   FROM production_metastore.processed_final_iceberg
   WHERE year || lpad(month, 2, '0') = '202401'
   ```
   
   and results in:
   
   ```
   Query Plan
   Queued: 270.97us, Analysis: 920.95ms, Planning: 465.63ms, Execution: 1.21m
   Fragment 1 [SINGLE]
       CPU: 466.36ms, Scheduled: 4.98s, Blocked 3.22m (Input: 2.30m, Output: 
0.00ns), Input: 13972 rows (122.80kB), Data Scanned: 0B; per task: avg.: 
13972.00 std.dev.: 0.00, Output: 1 row (9B)
       Output layout: [count]
       Output partitioning: SINGLE []
       Aggregate[type = FINAL]
       │   Layout: [count:bigint]
       │   Estimates: {rows: 1 (9B), cpu: 9, memory: 9B, network: 0B}
       │   CPU: 64.00ms (0.00%), Scheduled: 145.00ms (0.00%), Blocked: 0.00ns 
(0.00%), Output: 1 row (9B)
       │   Input avg.: 13972.00 rows, Input std.dev.: 0.00%
       │   count := count("count_0")
       └─ LocalExchange[partitioning = SINGLE]
          │   Layout: [count_0:bigint]
          │   Estimates: {rows: 1 (9B), cpu: 0, memory: 0B, network: 0B}
          │   CPU: 136.00ms (0.00%), Scheduled: 3.47s (0.00%), Blocked: 57.18s 
(29.30%), Output: 13972 rows (122.80kB)
          │   Input avg.: 3493.00 rows, Input std.dev.: 116.81%
          └─ RemoteSource[sourceFragmentIds = [2]]
                 Layout: [count_0:bigint]
                 CPU: 186.00ms (0.00%), Scheduled: 1.03s (0.00%), Blocked: 
2.30m (70.70%), Output: 13972 rows (122.80kB)
                 Input avg.: 3493.00 rows, Input std.dev.: 116.81%
   
   Fragment 2 [SOURCE]
       CPU: 7.92h, Scheduled: 23.33h, Blocked 0.00ns (Input: 0.00ns, Output: 
0.00ns), Input: 62896440400 rows (111.12MB), Data Scanned: 0B; per task: avg.: 
2096548013.33 std.dev.: 436430202.74, Output: 13972 rows (122.80kB)
       Output layout: [count_0]
       Output partitioning: SINGLE []
       Aggregate[type = PARTIAL]
       │   Layout: [count_0:bigint]
       │   Estimates: {rows: 1 (9B), cpu: 0, memory: 9B, network: 0B}
       │   CPU: 1.34m (0.28%), Scheduled: 5.13m (0.37%), Blocked: 0.00ns 
(0.00%), Output: 13972 rows (122.80kB)
       │   Input avg.: 4501606.10 rows, Input std.dev.: 103.44%
       │   count_0 := count(*)
       └─ ScanFilterProject[table = 
awsdatacatalog$iceberg-aws:prod.table_iceberg$data@4799610969623733758, 
filterPredicate = (concat("year", lpad("month", BIGINT '2', '0')) = VARCHAR 
'202401'), projectLocality = LOCAL, protectedBarrier = NONE]
              Layout: []
              Estimates: {rows: ? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: 
? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: ? (0B), cpu: 0, memory: 0B, 
network: 0B}
              CPU: 7.90h (99.72%), Scheduled: 23.25h (99.63%), Blocked: 0.00ns 
(0.00%), Output: 62896440400 rows (0B)
              Input avg.: 4501606.10 rows, Input std.dev.: 103.44%
              month := 182:month:varchar
              year := 181:year:varchar
              Input: 62896440400 rows (111.12MB), Filtered: 0.00%, Physical 
input time: 973630.00ms
   ```
   
   which looks, to me, to actually need to scan the data and takes much longer 
(70 s).
   
   We see similar behavior in Spark, which is actually where we originally came 
across it as we were testing the migration. v
   
   It this is a known issue, is there any expectation on when it may be 
addressed and/or any workarounds aside from rewriting queries? We were hoping 
to be able to migrate to Iceberg without having to adapt all of our queries 
(aside from e.g. introducing an Iceberg catalog), but this would seem to limit 
that. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to