[ 
https://issues.apache.org/jira/browse/HIVE-10165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sushanth Sowmyan updated HIVE-10165:
------------------------------------
    Fix Version/s:     (was: 1.2.0)

> Improve hive-hcatalog-streaming extensibility and support updates and deletes.
> ------------------------------------------------------------------------------
>
>                 Key: HIVE-10165
>                 URL: https://issues.apache.org/jira/browse/HIVE-10165
>             Project: Hive
>          Issue Type: Improvement
>          Components: HCatalog
>            Reporter: Elliot West
>            Assignee: Elliot West
>              Labels: streaming_api
>         Attachments: HIVE-10165.0.patch
>
>
> h3. Overview
> I'd like to extend the 
> [hive-hcatalog-streaming|https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest]
>  API so that it also supports the writing of record updates and deletes in 
> addition to the already supported inserts.
> h3. Motivation
> We have many Hadoop processes outside of Hive that merge changed facts into 
> existing datasets. Traditionally we achieve this by: reading in a 
> ground-truth dataset and a modified dataset, grouping by a key, sorting by a 
> sequence and then applying a function to determine inserted, updated, and 
> deleted rows. However, in our current scheme we must rewrite all partitions 
> that may potentially contain changes. In practice the number of mutated 
> records is very small when compared with the records contained in a 
> partition. This approach results in a number of operational issues:
> * Excessive amount of write activity required for small data changes.
> * Downstream applications cannot robustly read these datasets while they are 
> being updated.
> * Due to scale of the updates (hundreds or partitions) the scope for 
> contention is high. 
> I believe we can address this problem by instead writing only the changed 
> records to a Hive transactional table. This should drastically reduce the 
> amount of data that we need to write and also provide a means for managing 
> concurrent access to the data. Our existing merge processes can read and 
> retain each record's {{ROW_ID}}/{{RecordIdentifier}} and pass this through to 
> an updated form of the hive-hcatalog-streaming API which will then have the 
> required data to perform an update or insert in a transactional manner. 
> h3. Benefits
> * Enables the creation of large-scale dataset merge processes  
> * Opens up Hive transactional functionality in an accessible manner to 
> processes that operate outside of Hive.
> h3. Implementation
> Our changes do not break the existing API contracts. Instead our approach has 
> been to consider the functionality offered by the existing API and our 
> proposed API as fulfilling separate and distinct use-cases. The existing API 
> is primarily focused on the task of continuously writing large volumes of new 
> data into a Hive table for near-immediate analysis. Our use-case however, is 
> concerned more with the frequent but not continuous ingestion of mutations to 
> a Hive table from some ETL merge process. Consequently we feel it is 
> justifiable to add our new functionality via an alternative set of public 
> interfaces and leave the existing API as is. This keeps both APIs clean and 
> focused at the expense of presenting additional options to potential users. 
> Wherever possible, shared implementation concerns have been factored out into 
> abstract base classes that are open to third-party extension. A detailed 
> breakdown of the changes is as follows:
> * We've introduced a public {{RecordMutator}} interface whose purpose is to 
> expose insert/update/delete operations to the user. This is a counterpart to 
> the write-only {{RecordWriter}}. We've also factored out life-cycle methods 
> common to these two interfaces into a super {{RecordOperationWriter}} 
> interface.  Note that the row representation has be changed from {{byte[]}} 
> to {{Object}}. Within our data processing jobs our records are often 
> available in a strongly typed and decoded form such as a POJO or a Tuple 
> object. Therefore is seems to make sense that we are able to pass this 
> through to the {{OrcRecordUpdater}} without having to go through a {{byte[]}} 
> encoding step. This of course still allows users to use {{byte[]}} if they 
> wish.
> * The introduction of {{RecordMutator}} requires that insert/update/delete 
> operations are then also exposed on a {{TransactionBatch}} type. We've done 
> this with the introduction of a public {{MutatorTransactionBatch}} interface 
> which is a counterpart to the write-only {{TransactionBatch}}. We've also 
> factored out life-cycle methods common to these two interfaces into a super 
> {{BaseTransactionBatch}} interface. 
> * Functionality that would be shared by implementations of both 
> {{RecordWriters}} and {{RecordMutators}} has been factored out of 
> {{AbstractRecordWriter}} into a new abstract base class 
> {{AbstractOperationRecordWriter}}. The visibility is such that it is open to 
> extension by third parties. The {{AbstractOperationRecordWriter}} also 
> permits the setting of the {{AcidOutputFormat.Options#recordIdColumn()}} 
> (defaulted to {{-1}}) which is a requirement for enabling updates and 
> deletes. Additionally, these options are now fed an {{ObjectInspector}} via 
> an abstract method so that a {{SerDe}} is not mandated (it was not required 
> for our use-case). The {{AbstractRecordWriter}} is now much leaner, handling 
> only the extraction of the {{ObjectInspector}} from the {{SerDe}}.
> * A new abstract class, {{AbstractRecordMutator}} has been introduced to act 
> as the base of concrete {{RecordMutator}} implementations. The key 
> functionality added by this class is a validation step on {{update}} and 
> {{delete}} to ensure that the record specified contains a 
> {{RecordIdentifier}}. This was added as it is not explicitly checked for 
> elsewhere and would otherwise generate an NPE deep down in 
> {{OrcRecordUpdater}}.
> * There are now two private transaction batch implementations: 
> {{HiveEndPoint.TransactionBatchImpl}} and its insert/update/delete 
> counterpart: {{HiveEndPoint.MutationTransactionBatchImpl}}. As you might 
> expect, {{TransactionBatchImpl}} must delegate to a {{RecordWriter}} 
> implementation whereas {{MutationTransactionBatchImpl}} must delegates to a 
> {{RecordMutator}} implementation. Shared transaction batch functionality has 
> been factored out into an {{AbstractTransactionBatch}} class. In the case of 
> {{MutationTransactionBatchImpl}} we've added a check to ensure that an error 
> occurs should a user submit multiple types of operation to the same batch as 
> we've found that this can lead to inconsistent data being returned from the 
> underlying table when read from Hive.
> * To enable the usage of the different transaction batch variants we've added 
> an additional transaction batch factory method to {{StreamingConnection}} and 
> provided a suitable implementation in {{HiveEndPoint}}. It's worth noting 
> that {{StreamingConnection}} is the only public facing component of the API 
> contract that contains references to both the existing writer scheme and our 
> mutator scheme.
> Please find this changes in the attached patch: [^HIVE-10165.0.patch].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to