Hi Jun,
Good catch. We do have a cleanup mechanism to remove these partially written 
files before inserting to duplicate files but that itself could fail because of 
eventual consistency. 
We had reworked handling of these failure scenarios and eventual consistency in 
this PR. : https://github.com/apache/incubator-hudi/pull/651 
The initial motivation was to completely avoid temp file writing and renaming 
files which are costly operations in cloud. As part of this change, eventual 
consistency handling is also redone. This change should handle eventual 
consistency correctly with fine-granular consistency guards and using 
optimistic approach to handle duplicate file generation.
This change is scheduled to be available in 0.4.7 and we have been vetting this 
change by running large-scale testing.

Balaji.V    On Monday, May 13, 2019, 8:41:37 PM PDT, Jun Zhu 
<[email protected]> wrote:  
 
 Hi team,
Feedback for eventually consistency problem in s3.

*Scenario*:
Found files with same `bucket`(variable in hudi code) number in same
partition:

2019-05-07 20:21:39  11993262 0806a716-54ee-4343-bc0e-ca26a4cbbbce_*7*
_20190507122110.parquet

2019-05-07 20:21:34  11983784 c3790f3b-5a0e-4f2b-b934-3875175f6f9a_*7*
_20190507122110.parquet

*Exception in spark log*:

> 19/05/07 12:21:34 WARN TaskSetManager: Lost task 7.0 in stage 1709.0 (TID
> 289978, ip-172-19-111-50, executor 7): java.lang.RuntimeException:
> com.uber.hoodie.exception.HoodieException:
> com.uber.hoodie.exception.HoodieException:
> java.util.concurrent.ExecutionException:
> com.uber.hoodie.exception.HoodieInsertException: Failed to close the Insert
> Handle for path
> s3a://vungle2-dataeng/jun-test/stagebugfix20190507/2019-05-07_12/c3790f3b-5a0e-4f2b-b934-3875175f6f9a_7_20190507122110.parquet
> at
> com.uber.hoodie.func.LazyIterableIterator.next(LazyIterableIterator.java:121)
> at
> scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:378)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1109)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: com.uber.hoodie.exception.HoodieException:
> com.uber.hoodie.exception.HoodieException:
> java.util.concurrent.ExecutionException:
> com.uber.hoodie.exception.HoodieInsertException: Failed to close the Insert
> Handle for path
> s3a://vungle2-dataeng/jun-test/stagebugfix20190507/2019-05-07_12/c3790f3b-5a0e-4f2b-b934-3875175f6f9a_7_20190507122110.parquet
> at
> com.uber.hoodie.func.CopyOnWriteLazyInsertIterable.computeNext(CopyOnWriteLazyInsertIterable.java:106)
> at
> com.uber.hoodie.func.CopyOnWriteLazyInsertIterable.computeNext(CopyOnWriteLazyInsertIterable.java:45)
> at
> com.uber.hoodie.func.LazyIterableIterator.next(LazyIterableIterator.java:119)
> ... 20 more
> Caused by: com.uber.hoodie.exception.HoodieException:
> java.util.concurrent.ExecutionException:
> com.uber.hoodie.exception.HoodieInsertException: Failed to close the Insert
> Handle for path
> s3a://vungle2-dataeng/jun-test/stagebugfix20190507/2019-05-07_12/c3790f3b-5a0e-4f2b-b934-3875175f6f9a_7_20190507122110.parquet
> at
> com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:146)
> at
> com.uber.hoodie.func.CopyOnWriteLazyInsertIterable.computeNext(CopyOnWriteLazyInsertIterable.java:102)
> ... 22 more
> Caused by: java.util.concurrent.ExecutionException:
> com.uber.hoodie.exception.HoodieInsertException: Failed to close the Insert
> Handle for path
> s3a://vungle2-dataeng/jun-test/stagebugfix20190507/2019-05-07_12/c3790f3b-5a0e-4f2b-b934-3875175f6f9a_7_20190507122110.parquet
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:144)
> ... 23 more
> Caused by: com.uber.hoodie.exception.HoodieInsertException: Failed to
> close the Insert Handle for path
> s3a://vungle2-dataeng/jun-test/stagebugfix20190507/2019-05-07_12/c3790f3b-5a0e-4f2b-b934-3875175f6f9a_7_20190507122110.parquet
> at com.uber.hoodie.io.HoodieCreateHandle.close(HoodieCreateHandle.java:177)
> at
> com.uber.hoodie.func.CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler.finish(CopyOnWriteLazyInsertIterable.java:168)
> at
> com.uber.hoodie.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:42)
> at
> com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$null$77(BoundedInMemoryExecutor.java:124)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> ... 3 more
> Caused by: java.io.FileNotFoundException: No such file or directory:
> s3a://vungle2-dataeng/jun-test/stagebugfix20190507/2019-05-07_12/c3790f3b-5a0e-4f2b-b934-3875175f6f9a_7_20190507122110.parquet
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:993)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
> at com.uber.hoodie.common.util.FSUtils.getFileSize(FSUtils.java:126)
> at com.uber.hoodie.io.HoodieCreateHandle.close(HoodieCreateHandle.java:168)
> ... 7 more


*Cause*:
After write files, FSUtils failed to find files in this line:
https://github.com/apache/incubator-hudi/blob/master/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java#L168

So it throw exception out, and write another bucket 7, which exactly same
with "failed" one, and cause duplications in this partition.

Any thing we can do to avoid this case?

Thanks,
Jun
-- 
[image: vshapesaqua11553186012.gif] <https://vungle.com/>  *Jun Zhu*
Sr. Engineer I, Data
+86 18565739171

[image: in1552694272.png] <https://www.linkedin.com/company/vungle>    [image:
fb1552694203.png] <https://facebook.com/vungle>      [image:
tw1552694330.png] <https://twitter.com/vungle>      [image:
ig1552694392.png] <https://www.instagram.com/vungle>
Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China  

Reply via email to