[ https://issues.apache.org/jira/browse/HIVE-7803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14114404#comment-14114404 ]
Sushanth Sowmyan commented on HIVE-7803: ---------------------------------------- Hi, I like this change - it's simple and simplifies the flow to outputCommitter nicely. (This will need testing to verify that it works, and probably the full suite of e2e tests as well to verify that it continues to work from HCatStorer, but it's a good change.) That said, in its current form, this patch will not work, I think - not without one more change to the FileOutputCommitterContainer - simply put, FileOutputCommitterContainer.needsTaskCommit() currently returns false if it detects that dynamic partitioning has been used (since it assumes that the recordwriter already did it). With your change, it will need to be updated to return true. Apart from that, this looks good to me. If you update your patch and set it to patch-available, we can have the tests run on it. > Enable Hadoop speculative execution may cause corrupt output directory > (dynamic partition) > ------------------------------------------------------------------------------------------ > > Key: HIVE-7803 > URL: https://issues.apache.org/jira/browse/HIVE-7803 > Project: Hive > Issue Type: Bug > Components: HCatalog > Affects Versions: 0.13.1 > Environment: > Reporter: Selina Zhang > Assignee: Selina Zhang > Priority: Critical > Attachments: HIVE-7803.1.patch > > > One of our users reports they see intermittent failures due to attempt > directories in the input paths. We found with speculative execution turned > on, two mappers tried to commit task at the same time using the same > committed task path, which cause the corrupt output directory. > The original Pig script: > {code} > STORE AdvertiserDataParsedClean INTO '$DB_NAME.$ADVERTISER_META_TABLE_NAME' > USING org.apache.hcatalog.pig.HCatStorer(); > {code} > Two mappers > attempt_1405021984947_5394024_m_000523_0: KILLED > attempt_1405021984947_5394024_m_000523_1: SUCCEEDED > attempt_1405021984947_5394024_m_000523_0 was killed right after the commit. > As a result, it created corrupt directory as > > /projects/.../tablename/_DYN0.7192688458252056/load_time=201408050000/type=complete/_temporary/1/task_1405021984947_5394024_m_000523/ > containing > part-m-00523 (from attempt_1405021984947_5394024_m_000523_0) > and > attempt_1405021984947_5394024_m_000523_1/part-m-00523 > Namenode Audit log > ========================== > 1. 2014-08-05 05:04:36,811 INFO FSNamesystem.audit: ugi=* ip=ipaddress1 > cmd=create > src=/projects/.../tablename/_DYN0.7192688458252056/load_time=201408050000/type=complete/_temporary/1/_temporary/attempt_1405021984947_5394024_m_000523_0/part-m-00523 > dst=null perm=user:group:rw-r----- > 2. 2014-08-05 05:04:53,112 INFO FSNamesystem.audit: ugi=* ip=ipaddress2 > cmd=create > src=/projects/.../tablename/_DYN0.7192688458252056/load_time=201408050000/type=complete/_temporary/1/_temporary/attempt_1405021984947_5394024_m_000523_1/part-m-00523 > dst=null perm=user:group:rw-r----- > 3. 2014-08-05 05:05:13,001 INFO FSNamesystem.audit: ugi=* ip=ipaddress1 > cmd=rename > src=/projects/.../tablename/_DYN0.7192688458252056/load_time=201408050000/type=complete/_temporary/1/_temporary/attempt_1405021984947_5394024_m_000523_0 > dst=/projects/.../tablename/_DYN0.7192688458252056/load_time=201408050000/type=complete/_temporary/1/task_1405021984947_5394024_m_000523 > perm=user:group:rwxr-x--- > 4. 2014-08-05 05:05:13,004 INFO FSNamesystem.audit: ugi=* ip=ipaddress2 > cmd=rename > src=/projects/.../tablename/_DYN0.7192688458252056/load_time=201408050000/type=complete/_temporary/1/_temporary/attempt_1405021984947_5394024_m_000523_1 > dst=/projects/.../tablename/_DYN0.7192688458252056/load_time=201408050000/type=complete/_temporary/1/task_1405021984947_5394024_m_000523 > perm=user:group:rwxr-x--- > After consulting our Hadoop core team, we was pointed out some HCat code does > not participating in the two-phase commit protocol, for example in > FileRecordWriterContainer.close(): > {code} > for (Map.Entry<String, org.apache.hadoop.mapred.OutputCommitter> > entry : baseDynamicCommitters.entrySet()) { > org.apache.hadoop.mapred.TaskAttemptContext currContext = > dynamicContexts.get(entry.getKey()); > OutputCommitter baseOutputCommitter = entry.getValue(); > if (baseOutputCommitter.needsTaskCommit(currContext)) { > baseOutputCommitter.commitTask(currContext); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)