[
https://issues.apache.org/jira/browse/DRILL-6829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16676056#comment-16676056
]
Aman Sinha commented on DRILL-6829:
-----------------------------------
Here's one approach. Other approaches are also welcome and we can consider the
pros and cons.
Consider a Sort on (a1, b1)
*Scenario 1*: schema change from one numeric type to another numeric type
Batch 1: a1: INT, b1: INT (schema 1)
Batch 2: a1: FLOAT, b1: INT (schema 2)
*Scenario 2*: schema change from numeric to string type
Batch 1: a1: INT, b1: INT (schema 1)
Batch 2: a1: STRING, b1: INT (schema 3)
*Scenario 3:* schema change from numeric to string to float
Batch 1: a1: INT, b1: INT (schema 1)
Batch 2: a1: STRING, b1: INT (schema 3)
Batch 3: a1: FLOAT, b1: INT (schema 2)
Let's define *compatible schemas:*
schema 1 is compatible with schema 2 since INT and FLOAT are both numeric
types.
schema 1 and 2 are not compatible with schema 3 although technically one could
cast the numeric types to string but if the user has not explicitly done a
CAST
then we cannot impose it.
Define *sort order* among different schemas:
For compatible schemas, this is straighforward. For instance, sort order among
all numeric types: INT, BIGINT, FLOAT, DOUBLE. (TODO: need to define behavior
for Decimal type).
What's the sort order between incompatible schemas ? We can have a policy that
says by default numeric values appear first followed by String values followed
by Date values etc. This is similar to NULLS first/NULLS last. This policy
will appear in external documentation so users and BI tools are aware of the
expected order. In the future we may add a declarative way of changing the
default sort order between incompatible schemas.
Define the *output type* for compatible schemas:
The output type of compatible schemas will be the 'higher' data type, for
example:
(INT, FLOAT) ==> FLOAT
(DATE, TIMESTAMP) ==> TIMESTAMP
*Execution steps*
On each minor fragment that contains Sort, the algorithm would be as follows:
# Start with a single sorter internally based on the input record batch's
schema. If the input schema changes in a new record batch, start a new sorter.
Thus, there will be N sorters internally within the Sort operator corresponding
to the N schemas. NOTE: in most practical cases, N is quite small. We can have
a default 'schema change threshold' of 3. Beyond that, we would still throw a
SchemaChangeException.
# Each sorter maintains a separate list of batches for each schema.
# Each sorter sorts the rows for all batches belonging to that schema,
spilling as necessary.
# Create as many 'mergers' as the number of compatible schema categories. Thus
in scenario 1 there will be 1 merger while scenario 3 will have 2 mergers.
# Do a local merge of the data within each compatible category. The generated
code that does the comparison will need to handle comparisons of INT vs BIGINT,
vs FLOAT etc.
# This also means that an operator needs to be able to use more than 1 version
of generated code. The exact mechanics of this needs to be determined. If it
is not feasible, then we may think of running in non-code-gen mode when we
encounter schema change.. it will be slower but at least the query will
complete.
# Once the individual merges have been done within each compatible schema
category, do a cross-schema-union. This simply unions the results of the prior
merge. The union will preserve the relative ordering among incompatible
schemas.
# After the union, each Output Batch that is created will have the schema it
inherits from each side of the union. So if there are 3 inputs to the union,
there will be at least 3 output batches created each with a different schema.
Here are the steps that would need to happen internally in the Sort operator
for Scenario 3:
{noformat}
Output batch
|
Cross-Schema-Union
/ \
Merge Sort (schema 3)
/ \
Sort Sort
(schema 1) (schema 2){noformat}
On the foreman node that does the final Merge, there will be again a 2 step
process: first do the individual merge of compatible schemas coming from
separate minor fragments, followed by the Cross-schema-Union.
*Schema change due to NULLABILITY:*
In all the above scenarios, the data type may be nullable or non-nullable. If
the schema change occurs because a type changed from non-nullable to nullable,
it would be handled the same way as we would handle a 'compatible' schema
change since a nullable INT type is compatible with a non-nullable INT type.
*Schema change from Scalar type to Array type:*
This may occur for example when a single phone number changes to an array of
phone numbers in a customer table. Doing a Sort on an array does not make sense
but instead of failing the query, we could treat it the same way as
incompatible schema change by doing a cross-schema-union. Within the rows
containing arrays, the order would be the same as input order.
*Schema change in the non-sortkey columns and due to missing/added columns*:
<TODO>
> Handle schema change in ExternalSort
> ------------------------------------
>
> Key: DRILL-6829
> URL: https://issues.apache.org/jira/browse/DRILL-6829
> Project: Apache Drill
> Issue Type: New Feature
> Reporter: Aman Sinha
> Priority: Major
>
> While we continue to enhance the schema provision and metastore aspects in
> Drill, we also should explore what it means to be truly schema-less such that
> we can better handle \{semi, un}structured data, data sitting in DBs that
> store JSON documents (e.g Mongo, MapR-DB).
>
> The blocking operators are the main hurdles in this goal (other operators
> also need to be smarter about this but the problem is harder for the blocking
> operators). This Jira is specifically about ExternalSort.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)