[ 
https://issues.apache.org/jira/browse/DRILL-6829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16676056#comment-16676056
 ] 

Aman Sinha commented on DRILL-6829:
-----------------------------------

Here's one approach.  Other approaches are also welcome and we can consider the 
pros and cons.  

 
Consider a Sort on (a1, b1)
 
*Scenario 1*: schema change from one numeric type to another numeric type
Batch 1: a1: INT, b1: INT       (schema 1)
Batch 2: a1: FLOAT, b1: INT     (schema 2)
 
*Scenario 2*: schema change from numeric to string type 
Batch 1: a1: INT, b1: INT       (schema 1)
Batch 2: a1: STRING, b1: INT    (schema 3)
 
*Scenario 3:* schema change from numeric to string to float 
Batch 1: a1: INT, b1: INT       (schema 1)
Batch 2: a1: STRING, b1: INT    (schema 3)
Batch 3: a1: FLOAT, b1: INT     (schema 2)
 
Let's define *compatible schemas:* 
 schema 1 is compatible with schema 2 since INT and FLOAT are both numeric 
types. 
 schema 1 and 2 are not compatible with schema 3 although technically one could 
   cast the numeric types to string but if the user has not explicitly done a 
CAST
   then we cannot impose it. 
 
Define *sort order* among different schemas:
For compatible schemas, this is straighforward. For instance, sort order among 
all numeric types: INT, BIGINT, FLOAT, DOUBLE.  (TODO: need to define behavior 
for Decimal type). 
What's the sort order between incompatible schemas ?  We can have a policy that 
says by default numeric values appear first followed by String values followed 
by Date values etc.  This is similar to NULLS first/NULLS last. This policy 
will appear in external documentation so users and BI tools are aware of the 
expected order.  In the future we may add a declarative way of changing the 
default sort order between incompatible schemas. 
 
Define the *output type* for compatible schemas:
The output type of compatible schemas will be the 'higher' data type, for 
example: 
   (INT, FLOAT) ==> FLOAT
   (DATE, TIMESTAMP) ==> TIMESTAMP
 
*Execution steps*
On each minor fragment that contains Sort, the algorithm would be as follows:
 # Start with a single sorter internally based on the input record batch's 
schema. If the input schema changes in a new record batch, start a new sorter.  
Thus, there will be N sorters internally within the Sort operator corresponding 
to the N schemas. NOTE: in most practical cases, N is quite small.  We can have 
a default 'schema change threshold' of 3. Beyond that, we would still throw a 
SchemaChangeException.
 # Each sorter maintains a separate list of batches for each schema. 
 # Each sorter sorts the rows for all batches belonging to that schema, 
spilling as necessary.
 # Create as many 'mergers' as the number of compatible schema categories. Thus 
in scenario 1 there will be 1 merger while scenario 3 will have 2 mergers. 
 # Do a local merge of the data within each compatible category. The generated 
code that does the comparison will need to handle comparisons of INT vs BIGINT, 
vs FLOAT etc.
 # This also means that an operator needs to be able to use more than 1 version 
of generated code.  The exact mechanics of this needs to be determined.  If it 
is not feasible, then we may think of running in non-code-gen mode when we 
encounter schema change.. it will be slower but at least the query will 
complete. 
 # Once the individual merges have been done within each compatible schema 
category, do a cross-schema-union.  This simply unions the results of the prior 
merge. The union will preserve the relative ordering among incompatible 
schemas. 
 # After the union, each Output Batch that is created will have the schema it 
inherits from each side of the union. So if there are 3 inputs to the union, 
there will be at least 3 output batches created each with a different schema. 
 
Here are the steps that would need to happen internally in the Sort operator 
for Scenario 3: 
{noformat}
                  Output batch
                      |
                Cross-Schema-Union
                  /       \
              Merge      Sort (schema 3)
            /      \     
         Sort       Sort
      (schema 1)  (schema 2){noformat}
 
On the foreman node that does the final Merge, there will be again a 2 step 
process: first do the individual merge of compatible schemas coming from 
separate minor fragments, followed by the Cross-schema-Union. 
  
*Schema change due to NULLABILITY:*
In all the above scenarios, the data type may be nullable or non-nullable. If 
the schema change occurs because a type changed from non-nullable to nullable, 
it would be handled the same way as we would handle a 'compatible' schema 
change since a nullable INT type is compatible with a non-nullable INT type. 
 
*Schema change from Scalar type to Array type:*
This may occur for example when a single phone number changes to an array of 
phone numbers in a customer table. Doing a Sort on an array does not make sense 
but instead of failing the query, we could treat it the same way as 
incompatible schema change by doing a cross-schema-union. Within the rows 
containing arrays, the order would be the same as input order. 
 
*Schema change in the non-sortkey columns and due to missing/added columns*:
<TODO>

> Handle schema change in ExternalSort
> ------------------------------------
>
>                 Key: DRILL-6829
>                 URL: https://issues.apache.org/jira/browse/DRILL-6829
>             Project: Apache Drill
>          Issue Type: New Feature
>            Reporter: Aman Sinha
>            Priority: Major
>
> While we continue to enhance the schema provision and metastore aspects in 
> Drill, we also should explore what it means to be truly schema-less such that 
> we can better handle \{semi, un}structured data, data sitting in DBs that 
> store JSON documents (e.g Mongo, MapR-DB). 
>  
> The blocking operators are the main hurdles in this goal (other operators 
> also need to be smarter about this but the problem is harder for the blocking 
> operators).   This Jira is specifically about ExternalSort. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to