[ 
https://issues.apache.org/jira/browse/HIVE-10165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14517491#comment-14517491
 ] 

Elliot West commented on HIVE-10165:
------------------------------------

Happy to clarify:

The use case I've been considering is that there is a base dataset (0 or more 
rows) already in the transactional Hive table. Naturally this has record 
identifiers but likely some external primary key and possibly some versioning 
column. We then have some mutated records entering from an external system. 
These do not have record identifiers but do include the primary key. A user 
defined job external to Hive then consumes both of these datasets to determine 
which mutations should be streamed into Hive. The relevant record identifiers 
are obtained from the corresponding record in the base dataset. An exception to 
this is the insert mutation which has no existing corresponding record in the 
base dataset but also does not require a record identifier.

The precise details of the user defined job are outside of the scope of the 
streaming API, ultimately they are use case dependent. For example the external 
data source may be able to provide the type of mutation on each incoming record 
in which case the job need simply pull the record identifiers from the 
corresponding records in the base dataset using the external primary key. 
Alternatively, an external source might simply provide records that fall after 
some timestamp in which case the job will have to infer the mutation type by 
considering the records when grouped by the primary key and potentially sorted 
by some versioning column. In each case the record identifier must be obtained 
from the existing record in the Hive table (inserts are an exception to this). 
The output of these jobs will be a set of mutations that can be pushed into the 
table via the streaming API. Importantly these mutations will now have the 
correct record identifier appended.

Here is a hastily prepared example scheme which ignores deletion case:

*Base dataset from Hive table*
||recordIdentifier||PK||last_updated||message||
|originaltxnid=1, lasttxnid=1, rowid=1|1|1000|Hello|
|originaltxnid=2, lasttxnid=2, rowid=2|2|1001|Namaste|

*Delta dataset from external system*
||PK||last_updated||message||
|1|2002|Updated hello
|3|2001|Hola

*Result - mutations to push to the Hive streaming API*
||recordIdentifier||PK||last_updated||message||operation_type||
|originaltxnid=1, lasttxnid=1, rowid=1|1|2002|Updated hello|UPDATE
| |3|2001|Hola|INSERT

By processing mutations in this manner I hope to minimise the amount of data we 
write to handle minor changes to our data. Currently we must rewrite entire 
partitions, but I hope to be able to instead write small deltas to existing 
partitions. Additionally, I hope this will solve issues we have with concurrent 
access to these mutating datasets. Suitably equipped readers should have no 
trouble consuming these datasets while mutations are occurring (by acquiring 
read locks and utilising the {{ValidTxnList}}). Currently we've approached 
concurrent access with these less desirable approaches:
# being lucky and hoping that partitions do not get replaced which some other 
job is starting to reading them
# using highly contended dataset locks (implemented with simple lock files in 
HDFS)
# HDFS snapshots

Regarding the requirement to sort records by (originaltxnid, lasttxnid, rowid), 
I think this as a function of the external merge job prior to the data being 
pushed into the streaming API. However I now see two issues. The first is that 
I believe I only have easy access to {{originaltxnid}}, {{bucketid}} and 
{{rowid}} using the {{OrcInputFormat}} and {{RecordIdentifier}} - can you 
advise how I might also access the {{lasttxnid}}? The second is that it becomes 
the responsibility of the user defined merge job and therefore could be 
forgotten or incorrectly implemented. To minimise problems I could call out the 
requirement in documentation and possibly check for the submission of 
sequential record identifiers to the transaction batch.

 

> Improve hive-hcatalog-streaming extensibility and support updates and deletes.
> ------------------------------------------------------------------------------
>
>                 Key: HIVE-10165
>                 URL: https://issues.apache.org/jira/browse/HIVE-10165
>             Project: Hive
>          Issue Type: Improvement
>          Components: HCatalog
>            Reporter: Elliot West
>            Assignee: Elliot West
>              Labels: streaming_api
>             Fix For: 1.2.0
>
>         Attachments: HIVE-10165.0.patch
>
>
> h3. Overview
> I'd like to extend the 
> [hive-hcatalog-streaming|https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest]
>  API so that it also supports the writing of record updates and deletes in 
> addition to the already supported inserts.
> h3. Motivation
> We have many Hadoop processes outside of Hive that merge changed facts into 
> existing datasets. Traditionally we achieve this by: reading in a 
> ground-truth dataset and a modified dataset, grouping by a key, sorting by a 
> sequence and then applying a function to determine inserted, updated, and 
> deleted rows. However, in our current scheme we must rewrite all partitions 
> that may potentially contain changes. In practice the number of mutated 
> records is very small when compared with the records contained in a 
> partition. This approach results in a number of operational issues:
> * Excessive amount of write activity required for small data changes.
> * Downstream applications cannot robustly read these datasets while they are 
> being updated.
> * Due to scale of the updates (hundreds or partitions) the scope for 
> contention is high. 
> I believe we can address this problem by instead writing only the changed 
> records to a Hive transactional table. This should drastically reduce the 
> amount of data that we need to write and also provide a means for managing 
> concurrent access to the data. Our existing merge processes can read and 
> retain each record's {{ROW_ID}}/{{RecordIdentifier}} and pass this through to 
> an updated form of the hive-hcatalog-streaming API which will then have the 
> required data to perform an update or insert in a transactional manner. 
> h3. Benefits
> * Enables the creation of large-scale dataset merge processes  
> * Opens up Hive transactional functionality in an accessible manner to 
> processes that operate outside of Hive.
> h3. Implementation
> Our changes do not break the existing API contracts. Instead our approach has 
> been to consider the functionality offered by the existing API and our 
> proposed API as fulfilling separate and distinct use-cases. The existing API 
> is primarily focused on the task of continuously writing large volumes of new 
> data into a Hive table for near-immediate analysis. Our use-case however, is 
> concerned more with the frequent but not continuous ingestion of mutations to 
> a Hive table from some ETL merge process. Consequently we feel it is 
> justifiable to add our new functionality via an alternative set of public 
> interfaces and leave the existing API as is. This keeps both APIs clean and 
> focused at the expense of presenting additional options to potential users. 
> Wherever possible, shared implementation concerns have been factored out into 
> abstract base classes that are open to third-party extension. A detailed 
> breakdown of the changes is as follows:
> * We've introduced a public {{RecordMutator}} interface whose purpose is to 
> expose insert/update/delete operations to the user. This is a counterpart to 
> the write-only {{RecordWriter}}. We've also factored out life-cycle methods 
> common to these two interfaces into a super {{RecordOperationWriter}} 
> interface.  Note that the row representation has be changed from {{byte[]}} 
> to {{Object}}. Within our data processing jobs our records are often 
> available in a strongly typed and decoded form such as a POJO or a Tuple 
> object. Therefore is seems to make sense that we are able to pass this 
> through to the {{OrcRecordUpdater}} without having to go through a {{byte[]}} 
> encoding step. This of course still allows users to use {{byte[]}} if they 
> wish.
> * The introduction of {{RecordMutator}} requires that insert/update/delete 
> operations are then also exposed on a {{TransactionBatch}} type. We've done 
> this with the introduction of a public {{MutatorTransactionBatch}} interface 
> which is a counterpart to the write-only {{TransactionBatch}}. We've also 
> factored out life-cycle methods common to these two interfaces into a super 
> {{BaseTransactionBatch}} interface. 
> * Functionality that would be shared by implementations of both 
> {{RecordWriters}} and {{RecordMutators}} has been factored out of 
> {{AbstractRecordWriter}} into a new abstract base class 
> {{AbstractOperationRecordWriter}}. The visibility is such that it is open to 
> extension by third parties. The {{AbstractOperationRecordWriter}} also 
> permits the setting of the {{AcidOutputFormat.Options#recordIdColumn()}} 
> (defaulted to {{-1}}) which is a requirement for enabling updates and 
> deletes. Additionally, these options are now fed an {{ObjectInspector}} via 
> an abstract method so that a {{SerDe}} is not mandated (it was not required 
> for our use-case). The {{AbstractRecordWriter}} is now much leaner, handling 
> only the extraction of the {{ObjectInspector}} from the {{SerDe}}.
> * A new abstract class, {{AbstractRecordMutator}} has been introduced to act 
> as the base of concrete {{RecordMutator}} implementations. The key 
> functionality added by this class is a validation step on {{update}} and 
> {{delete}} to ensure that the record specified contains a 
> {{RecordIdentifier}}. This was added as it is not explicitly checked for 
> elsewhere and would otherwise generate an NPE deep down in 
> {{OrcRecordUpdater}}.
> * There are now two private transaction batch implementations: 
> {{HiveEndPoint.TransactionBatchImpl}} and its insert/update/delete 
> counterpart: {{HiveEndPoint.MutationTransactionBatchImpl}}. As you might 
> expect, {{TransactionBatchImpl}} must delegate to a {{RecordWriter}} 
> implementation whereas {{MutationTransactionBatchImpl}} must delegates to a 
> {{RecordMutator}} implementation. Shared transaction batch functionality has 
> been factored out into an {{AbstractTransactionBatch}} class. In the case of 
> {{MutationTransactionBatchImpl}} we've added a check to ensure that an error 
> occurs should a user submit multiple types of operation to the same batch as 
> we've found that this can lead to inconsistent data being returned from the 
> underlying table when read from Hive.
> * To enable the usage of the different transaction batch variants we've added 
> an additional transaction batch factory method to {{StreamingConnection}} and 
> provided a suitable implementation in {{HiveEndPoint}}. It's worth noting 
> that {{StreamingConnection}} is the only public facing component of the API 
> contract that contains references to both the existing writer scheme and our 
> mutator scheme.
> Please find this changes in the attached patch: [^HIVE-10165.0.patch].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to