Pavan Lanka created ORC-742:
-------------------------------

             Summary: LazyIO of non-filter columns in the presence of filters
                 Key: ORC-742
                 URL: https://issues.apache.org/jira/browse/ORC-742
             Project: ORC
          Issue Type: Improvement
          Components: Reader
            Reporter: Pavan Lanka
            Assignee: Pavan Lanka


h2. Background

This feature request started as a result of a large search that is performed 
with the following characteristics:
 * The search fields are not part of partition, bucket or sort fields.
 * The table is a very large table.
 * The predicates result in very few rows compared to the scan size.
 * The search columns are a significant subset of selection columns in the 
query.

Initial analysis showed that we could have a significant benefit by lazily 
reading the non-search columns only when we have a match. We explore the design 
and some benchmarks in subsequent sections.
h2. Design

ORC-577 introduced filters during the read process. However the optimization 
did not include IO skips of non-filter columns.

We propose that the following take place during the read on a high level:
 * Read the filter columns
 * Apply the filter
 * On the first hit read the non-filter columns

h3. Read

The read process has the following changes:

 

{{{{                         │
                         │
                         │
┌────────────────────────▼────────────────────────┐
│               ┏━━━━━━━━━━━━━━━━┓                │
│               ┃Plan ++Search++ ┃                │
│               ┃    Columns     ┃                │
│               ┗━━━━━━━━━━━━━━━━┛                │
│                 Read   │Stripe                  │
└────────────────────────┼────────────────────────┘
                         │
                         ▼


                         │
                         │
┌────────────────────────▼────────────────────────┐
│               ┏━━━━━━━━━━━━━━━━┓                │
│               ┃Read ++Search++ ┃                │
│               ┃    Columns     ┃◀─────────┐     │
│               ┗━━━━━━━━━━━━━━━━┛          │     │
│                        │              Size = 0  │
│                        ▼                  │     │
│               ┏━━━━━━━━━━━━━━━━┓          │     │
│               ┃  Apply Filter  ┃──────────┘     │
│               ┗━━━━━━━━━━━━━━━━┛                │
│                    Size > 0                     │
│                        │                        │
│                        ▼                        │
│               ┏━━━━━━━━━━━━━━━━┓                │
│               ┃  Plan Select   ┃                │
│               ┃    Columns     ┃                │
│               ┗━━━━━━━━━━━━━━━━┛                │
│                        │                        │
│                        ▼                        │
│               ┏━━━━━━━━━━━━━━━━┓                │
│               ┃  Read Select   ┃                │
│               ┃    Columns     ┃                │
│               ┗━━━━━━━━━━━━━━━━┛                │
│                   Next │Batch                   │
└────────────────────────┼────────────────────────┘
                         │
                         ▼}}}}

The read process changes:
 * *Read Stripe* used to plan the read of all (search + select) columns. This 
is enhanced to plan and fetch only the search columns. The rest of the stripe 
planning process optimizations remain unchanged e.g. partial read planning of 
the stripe based on RowGroup statistics.
 * *Next Batch* identifies the processing that takes place when 
{{RecordReader.nextBatch}} is invoked.
 ** *Read Search Columns* takes place instead of reading all the selected 
columns. This is in sync with the planning that has taken place during *Read 
Stripe* where only the search columns have been planned.
 ** *Apply Filter* on the batch that at this point only includes search 
columns. Evaluate the result of the filter:
 *** *Size = 0* indicates all records have been filtered out. Given this we 
proceed to the next batch of search columns.
 *** *Size > 0* indicates that at least one record accepted by the filter. This 
record needs to be substantiated with other columns.
 ** *Plan Select Columns* is invoked to perform read of the select columns. The 
planning happens as follows:
 *** Determine the current position of the read within the stripe and plan the 
read for the select columns from this point forward to the end of the stripe.
 *** The Read planning of select columns respects the row groups filtered out 
as a result of the stripe planning.
 *** Fetch the select columns using the above plan.
 ** *Read Select Columns* into the vectorized row batch
 ** Return this batch.

The current implementation performs a single read for the select columns in a 
stripe.

 

{{┌──────────────────────────────────────────────────┐
│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
│ │RG0 │ │RG1 │ │RG2■│ │RG3 │ │RG4 │ │RG5■│ │RG6 │ │
│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
│                      Stripe                      │
└──────────────────────────────────────────────────┘}}

The above diagram depicts a stripe with 7 Row Groups out of which *RG2* and 
*RG5* are selected by the filter. The current implementation does the following:
 * Start the read planning process from the first match RG2
 * Read to the end of the stripe that includes RG6
 * Based on the above fetch skips RG0 and RG1 subject to compression block 
boundaries

The above logic could be enhanced to perform say *2 or n* reads before reading 
to the end of stripe. The current implementation allows 0 reads before reading 
to the end of the stripe. The value of *n* could be configurable but should 
avoid too many short reads.

The read behavior changes as follows with multiple reads being allowed within a 
stripe for select columns:

 

{{┌──────────────────────────────────────────────────┐
│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
│ │    │ │    │ │■■■■│ │■■■■│ │■■■■│ │■■■■│ │■■■■│ │
│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
│              Current implementation              │
└──────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────┐
│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
│ │    │ │    │ │■■■■│ │    │ │    │ │■■■■│ │■■■■│ │
│ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │
│               Allow 1 partial read               │
└──────────────────────────────────────────────────┘}}

The figure shows that we could read significantly fewer bytes by performing an 
additional read before reading to the end of stripe. This shall be included as 
a subsequent enhancement to this patch.
h2. Tests

This offers significant savings on IO and CPU in scans that result in very rows 
as a result of the predicates.

We have seen the following from our tests:
 * Table
 ** Size: ~*420 TB*
 ** Data fields: ~*120*
 ** Partition fields: *3*
 * Scan
 ** Search fields: 3 data fields with large (~ 1000 value) IN clauses 
compounded by *OR*.
 ** Select fields: 16 data fields (includes the 3 search fields), 1 partition 
field
 ** Search:
 *** Size: ~*180 TB*
 *** Records: *3.99 T*
 ** Selected:
 *** Size: ~*100 MB*
 *** Records: *1 M*

We have observed the following reductions:
||Test||IO Reduction %||CPU Reduction %||
|SELECT 16 fields|45|47|
|SELECT *|70|87|
 * The savings are more significant as you increase the number of select 
columns with respect to the search columns
 * When the filter selects most data, no significant penalty observed as a 
result of 2 IOs compared to a single IO
 ** We do have a penalty as a result of the filter application on the selected 
records.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to