[ 
https://issues.apache.org/jira/browse/HIVE-10165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elliot West updated HIVE-10165:
-------------------------------
    Attachment: HIVE-10165.6.patch

Thank you for such an in depth review. I've updated the patch: 
[^HIVE-10165.6.patch]. If one of your review points is not replied to here then 
you may consider it addressed directly in the patch. Here are my thoughts 
regarding the other specific issues, including any resolutions that I've 
applied:

{quote}
Lock.internalRelease: You've built in handling for releasing locks that are not 
part of transactions. When you do envision users locking something that isn't 
part of a transaction? Since this is doing write operations I would assume 
you'll always have a transaction.
{quote}

My intention is that this class will also be independently useful for  
components that read ACID data, as they will need to initiate a read lock with 
the MetaStore. As an example, I hope to replace [this code in the 
cascading-hive 
project|https://github.com/Cascading/cascading-hive/blob/wip-2.0/src/main/java/cascading/tap/hive/LockManager.java]
 with an instance of this {{Lock}} class.

{quote}
Transaction: Why do commit() and abort() release the locks? Since these locks 
are part of a transaction they will always be released when the transaction is 
committed or aborted.
{quote}

This is in place purely to cancel the heartbeat timer. If you look at 
{{Lock:143}} you may notice that an {{unlock}} call isn't issued to the 
MetaStore as part of the {{release}} if the lock is part of a transaction.

{quote}
MutatorClient: Why is Lock external to this class? It seems like Lock is a 
component of this class. Or do you envision users using one Lock object to 
manage multiple MutatorClients?
{quote}

This comment helped me see that I was missing a test, thanks. No sharing of 
locks was intended, I wanted a way to inject a mock which I then discovered was 
unnecessary once I had written said test. Lock construction now takes place in 
the {{Transaction}} not the {{MutatorClientBuilder}}.

{quote}
MutatorCoordinator: In the constructor, why are you passing in 
CreatePartitionHelper and SequenceValidator when there's only one instance of 
these?
{quote}

I wanted the ability to mock them in the {{TestMutatorCoordinator}} test. They 
are package private, so this separation doesn't leak into the public API.
If this is undesirable, can you recommend an alternative approach?

{quote}
MutatorCoordinator.resetMutator, this code is closing the Mutator everytime you 
switch Mutators. But if I understand correctly this is going to result in 
writing a footer in the ORC file. You're going to end up with a thousand tiny 
stripes in your files. That is not what you want. You do need to make sure you 
don't have too many open at a time to avoids OOMs and too many file handles 
open errors. But you'll need to keep a list of which ones are open and then 
close them on an LRU basis (or maybe pick the one with the most records since 
it will give you the best stripe size) as you need to open more rather than 
closing each one each time. Owen O'Malley comments?
{quote}

This class relies on the correct grouping of the data (by partition,bucket) to 
avoid the problem that you describe. As long as the data arrives grouped in 
this way, we can guarantee that once a given {{Mutator}} has been closed it'll 
never be needed again. It's also resource-light approach too, only one 
{{Mutator}} (and hence file) need be open at a given time. A {{Mutator}} cache 
would introduce more flexibility and resilience by relaxing the data grouping 
requirement, but this could then push optimisation decisions back to the user, 
now having to trade-off the number of open mutators and stripe size. I felt 
that as the user must sort the data anyway (lastTxnId,rowId) a grouping could 
generally be obtained for 'free' at the same time and thus allow a simpler 
mechanism to be employed. Very keen to hear your thoughts on this.

{quote}
CreationPartitionHelper.createPartitionIfNotExists: Why are you running the 
Driver class here? Why not call IMetaStoreClient.addPartition()? That would be 
much lighter weight.
{quote}

Agreed. I lifted [this code from the original streaming 
API|https://github.com/apache/hive/blob/80fb8913196eef8e4125544c3138b0c73be267b7/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L350]
 and assumed that this worked around a limitation that I wasn't aware of. I've 
modified it to use the MetaStoreClient.


> Improve hive-hcatalog-streaming extensibility and support updates and deletes.
> ------------------------------------------------------------------------------
>
>                 Key: HIVE-10165
>                 URL: https://issues.apache.org/jira/browse/HIVE-10165
>             Project: Hive
>          Issue Type: Improvement
>          Components: HCatalog
>    Affects Versions: 1.2.0
>            Reporter: Elliot West
>            Assignee: Elliot West
>              Labels: streaming_api
>         Attachments: HIVE-10165.0.patch, HIVE-10165.4.patch, 
> HIVE-10165.5.patch, HIVE-10165.6.patch, mutate-system-overview.png
>
>
> h3. Overview
> I'd like to extend the 
> [hive-hcatalog-streaming|https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest]
>  API so that it also supports the writing of record updates and deletes in 
> addition to the already supported inserts.
> h3. Motivation
> We have many Hadoop processes outside of Hive that merge changed facts into 
> existing datasets. Traditionally we achieve this by: reading in a 
> ground-truth dataset and a modified dataset, grouping by a key, sorting by a 
> sequence and then applying a function to determine inserted, updated, and 
> deleted rows. However, in our current scheme we must rewrite all partitions 
> that may potentially contain changes. In practice the number of mutated 
> records is very small when compared with the records contained in a 
> partition. This approach results in a number of operational issues:
> * Excessive amount of write activity required for small data changes.
> * Downstream applications cannot robustly read these datasets while they are 
> being updated.
> * Due to scale of the updates (hundreds or partitions) the scope for 
> contention is high. 
> I believe we can address this problem by instead writing only the changed 
> records to a Hive transactional table. This should drastically reduce the 
> amount of data that we need to write and also provide a means for managing 
> concurrent access to the data. Our existing merge processes can read and 
> retain each record's {{ROW_ID}}/{{RecordIdentifier}} and pass this through to 
> an updated form of the hive-hcatalog-streaming API which will then have the 
> required data to perform an update or insert in a transactional manner. 
> h3. Benefits
> * Enables the creation of large-scale dataset merge processes  
> * Opens up Hive transactional functionality in an accessible manner to 
> processes that operate outside of Hive.
> h3. Implementation
> Our changes do not break the existing API contracts. Instead our approach has 
> been to consider the functionality offered by the existing API and our 
> proposed API as fulfilling separate and distinct use-cases. The existing API 
> is primarily focused on the task of continuously writing large volumes of new 
> data into a Hive table for near-immediate analysis. Our use-case however, is 
> concerned more with the frequent but not continuous ingestion of mutations to 
> a Hive table from some ETL merge process. Consequently we feel it is 
> justifiable to add our new functionality via an alternative set of public 
> interfaces and leave the existing API as is. This keeps both APIs clean and 
> focused at the expense of presenting additional options to potential users. 
> Wherever possible, shared implementation concerns have been factored out into 
> abstract base classes that are open to third-party extension. A detailed 
> breakdown of the changes is as follows:
> * We've introduced a public {{RecordMutator}} interface whose purpose is to 
> expose insert/update/delete operations to the user. This is a counterpart to 
> the write-only {{RecordWriter}}. We've also factored out life-cycle methods 
> common to these two interfaces into a super {{RecordOperationWriter}} 
> interface.  Note that the row representation has be changed from {{byte[]}} 
> to {{Object}}. Within our data processing jobs our records are often 
> available in a strongly typed and decoded form such as a POJO or a Tuple 
> object. Therefore is seems to make sense that we are able to pass this 
> through to the {{OrcRecordUpdater}} without having to go through a {{byte[]}} 
> encoding step. This of course still allows users to use {{byte[]}} if they 
> wish.
> * The introduction of {{RecordMutator}} requires that insert/update/delete 
> operations are then also exposed on a {{TransactionBatch}} type. We've done 
> this with the introduction of a public {{MutatorTransactionBatch}} interface 
> which is a counterpart to the write-only {{TransactionBatch}}. We've also 
> factored out life-cycle methods common to these two interfaces into a super 
> {{BaseTransactionBatch}} interface. 
> * Functionality that would be shared by implementations of both 
> {{RecordWriters}} and {{RecordMutators}} has been factored out of 
> {{AbstractRecordWriter}} into a new abstract base class 
> {{AbstractOperationRecordWriter}}. The visibility is such that it is open to 
> extension by third parties. The {{AbstractOperationRecordWriter}} also 
> permits the setting of the {{AcidOutputFormat.Options#recordIdColumn()}} 
> (defaulted to {{-1}}) which is a requirement for enabling updates and 
> deletes. Additionally, these options are now fed an {{ObjectInspector}} via 
> an abstract method so that a {{SerDe}} is not mandated (it was not required 
> for our use-case). The {{AbstractRecordWriter}} is now much leaner, handling 
> only the extraction of the {{ObjectInspector}} from the {{SerDe}}.
> * A new abstract class, {{AbstractRecordMutator}} has been introduced to act 
> as the base of concrete {{RecordMutator}} implementations. The key 
> functionality added by this class is a validation step on {{update}} and 
> {{delete}} to ensure that the record specified contains a 
> {{RecordIdentifier}}. This was added as it is not explicitly checked for 
> elsewhere and would otherwise generate an NPE deep down in 
> {{OrcRecordUpdater}}.
> * There are now two private transaction batch implementations: 
> {{HiveEndPoint.TransactionBatchImpl}} and its insert/update/delete 
> counterpart: {{HiveEndPoint.MutationTransactionBatchImpl}}. As you might 
> expect, {{TransactionBatchImpl}} must delegate to a {{RecordWriter}} 
> implementation whereas {{MutationTransactionBatchImpl}} must delegates to a 
> {{RecordMutator}} implementation. Shared transaction batch functionality has 
> been factored out into an {{AbstractTransactionBatch}} class. In the case of 
> {{MutationTransactionBatchImpl}} we've added a check to ensure that an error 
> occurs should a user submit multiple types of operation to the same batch as 
> we've found that this can lead to inconsistent data being returned from the 
> underlying table when read from Hive.
> * To enable the usage of the different transaction batch variants we've added 
> an additional transaction batch factory method to {{StreamingConnection}} and 
> provided a suitable implementation in {{HiveEndPoint}}. It's worth noting 
> that {{StreamingConnection}} is the only public facing component of the API 
> contract that contains references to both the existing writer scheme and our 
> mutator scheme.
> Please find this changes in the attached patch: [^HIVE-10165.0.patch].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to