[
https://issues.apache.org/jira/browse/HIVE-14980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15578923#comment-15578923
]
Mahipal Jupalli commented on HIVE-14980:
----------------------------------------
Hi,
My idea is to replicate the same checks from the Initiator to the Worker logic.
{code:title=org.apache.hadoop.hive.ql.txn.compactor.Initiator.java|borderStyle=solid}
// Figure out if there are any currently running compactions on the same table
or partition.
private boolean lookForCurrentCompactions(ShowCompactResponse compactions,
CompactionInfo ci) {
if (compactions.getCompacts() != null) {
for (ShowCompactResponseElement e : compactions.getCompacts()) {
if ((e.getState().equals(TxnStore.WORKING_RESPONSE) ||
e.getState().equals(TxnStore.INITIATED_RESPONSE)) &&
e.getDbname().equals(ci.dbname) &&
e.getTablename().equals(ci.tableName) &&
(e.getPartitionname() == null && ci.partName == null ||
e.getPartitionname().equals(ci.partName))) {
return true;
}
}
}
return false;
}
public void run(){
//...
if (lookForCurrentCompactions(currentCompactions, ci)) {
LOG.debug("Found currently initiated or working compaction for " +
ci.getFullPartitionName() + " so we will not initiate another compaction");
continue;
}
//...
}
{code}
{code:title=org.apache.hadoop.hive.ql.txn.compactor.Worker.java|borderStyle=solid}
public void run() {
//...
// This chicanery is to get around the fact that the table needs to
be final in order to
// go into the doAs below.
final Table t = t1;
ShowCompactResponse currentCompactions = txnHandler.showCompact(new
ShowCompactRequest());
if (lookForCurrentCompactions(currentCompactions, ci)) {
LOG.debug("Found currently initiated or working compaction for " +
ci.getFullPartitionName() + " so we will not initiate another
compaction");
continue;
}
// Find the partition we will be working with, if there is one.
Partition p = null;
//...
//Figure out if there are any currently running compactions on the same
table or partition.
private boolean lookForCurrentCompactions(ShowCompactResponse compactions,
CompactionInfo ci) {
if (compactions.getCompacts() != null) {
for (ShowCompactResponseElement e : compactions.getCompacts()) {
if ((e.getState().equals(TxnStore.WORKING_RESPONSE) ||
e.getState().equals(TxnStore.INITIATED_RESPONSE)) &&
e.getDbname().equals(ci.dbname) &&
e.getTablename().equals(ci.tableName) &&
(e.getPartitionname() == null && ci.partName == null ||
e.getPartitionname().equals(ci.partName))) {
return true;
}
}
}
return false;
}
}
//...
{code}
Please let me know if this is the correct approach.
> Minor compaction when triggered simultaniously on the same table/partition
> deletes data
> ---------------------------------------------------------------------------------------
>
> Key: HIVE-14980
> URL: https://issues.apache.org/jira/browse/HIVE-14980
> Project: Hive
> Issue Type: Bug
> Components: Metastore
> Affects Versions: 2.1.0
> Reporter: Mahipal Jupalli
> Assignee: Mahipal Jupalli
> Priority: Critical
> Original Estimate: 96h
> Remaining Estimate: 96h
>
> I have two tables (TABLEA, TABLEB). If I manually trigger compaction after
> each INSERT into TABLEB from TABLEA, compactions are triggered on random
> metastore asynchronously and are stepping on each other which is causing the
> data to be deleted.
> Example here:
> TABLEA - has 10k rows.
> insert into mj.tableb select * from mj.tablea;
> alter table mj.tableb compact 'MINOR';
> insert into mj.tableb select * from mj.tablea;
> alter table mj.tableb compact 'MINOR';
> Once all the compactions are complete, I should ideally see 20k rows in
> TABLEB. But I see only 10k rows (Only the rows INSERTED before the last
> compaction persist, the old rows are deleted. I believe the old delta files
> are deleted).
> To further confirm the bug, if I do only one compaction after two inserts, I
> see 20k rows in TABLEB.
> Proposed Fix:
> I have identified the bug in the code, it requires an additional check in the
> org.apache.hadoop.hive.ql.txn.compactor.Worker class to check for any active
> compactions on the table/partition. I will 'share the details of the fix once
> I test it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)