Hi Jun, I was able to track that the HoodieSparkSQLWriter (common path for streaming sink and batch datasource) ends up calling DataSourceUtils.createHoodieClient, which creates the client as follows
return new HoodieWriteClient<>(jssc, writeConfig); There is a third parameter that denotes whether the writer needs to rollback inflights. For e.g, DeltaStreamer invokes HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg, true); While I trace down why we had this difference, could you try changing this one line here, and add third "true" argument and give it a shot. https://github.com/apache/incubator-hudi/blob/b34a204a527a156406908686e54484a0c3d8a3d7/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java#L148 Thanks Vinoth On Tue, Apr 30, 2019 at 11:16 PM [email protected] <[email protected]> wrote: > > Hi Jun, > You had mentioned that you are seeing the log message > "insert failed with 1 errors" > Did you see any exception stack traces before this message. You can also > take a look at spark UI to see if stdout/stderr of failed tasks (if > present). > Also, it looks like if you also enable "trace" level logging, you would > see exceptions getting logged at the end. So, enabling "trace" level > logging is another way to debug what is happening. > '''log.error(s"$operation failed with ${errorCount} errors :"); > if (log.isTraceEnabled) { > log.trace("Printing out the top 100 errors") ....... > ''' > Balaji.V > > On Tuesday, April 30, 2019, 8:17:57 AM PDT, Vinoth Chandar < > [email protected]> wrote: > > Hi Jun, > > Basically you are saying streaming path leaves some inflights behind.. let > me see if I can reproduce it. If you have a simple test case, please share > > Thanks > Vinoth > > On Tue, Apr 30, 2019 at 1:04 AM Jun Zhu <[email protected]> > wrote: > > > Hi Vinoth, > > In spark streaming log I find "2019-04-30 03:26:11 ERROR > > HoodieSparkSQLWriter:182 - insert failed with 1 errors :"(no continue > error > > logs) , during which commit end with inflight and not cleaned. > > Just for feedback, we can dedup data correctly in batch way. Should add > > more logic for exception handling if using spark stream I think. > > Regards, > > Jun > > > > > > On Tue, Apr 30, 2019 at 2:46 AM Vinoth Chandar <[email protected]> > wrote: > > > > > Another option to try would be setting the > > > spark.sql.hive.convertMetastoreParquet=false, if you are querying via > the > > > Hive table registered by Hudi. > > > > > > On Sat, Apr 27, 2019 at 7:02 PM Jun Zhu <[email protected]> > > > wrote: > > > > > > > Thanks for explanation vinoth, code was same list in > > > > https://github.com/apache/incubator-hudi/issues/639, with setting > > table > > > > format to `.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, > > > > DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)`. > > > > And the result data was stored on aws s3. > > > > I will try more on > > > > > > > > > > > > > > `spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", > > > > classOf[com.uber.hoodie.hadoop.HoodieROTablePathFilter], > > > > classOf[org.apache.hadoop.fs.PathFilter]);` from the phenomenon, the > > > > config did not take effects maybe. > > > > > > > > On Sat, Apr 27, 2019 at 12:09 AM Vinoth Chandar <[email protected]> > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > >>The duplicates was found in inflight commit parquet files. > > Wondering > > > if > > > > > this was expected? > > > > > Spark shell should not even be reading in-flight parquet files. Can > > you > > > > > double check if the spark access is properly configured? > > > > > http://hudi.apache.org/querying_data.html#spark > > > > > > > > > > Inflight should be rolled back at the start of the next > commit/delta > > > > > commit.. Not sure why there are so many inflight delta commits. > > > > > If you can give a reproducible case, happy to debug it more.. > > > > > > > > > > Only complete instants are archived.. So yes, inflight is not > > > archived.. > > > > > > > > > > Hope that helps > > > > > > > > > > Thanks > > > > > Vinoth > > > > > > > > > > On Fri, Apr 26, 2019 at 2:09 AM Jun Zhu <[email protected] > > > > > > > wrote: > > > > > > > > > > > Hi Vinoth, > > > > > > Some continue question about this thread. > > > > > > Here is what I found after running a few days: > > > > > > in .hoodie folder, due to retain policy maybe, there is an > > obviously > > > > > > line(list in the end of email). Before it the cleaned commit was > > > > > archived, > > > > > > find duplication when query inflight commit correspond partition > by > > > > > > spark-shell. After the line, all behave normal, global dedup > works. > > > > > > The duplicates was found in inflight commit parquet files. > > Wondering > > > if > > > > > > this was expected? > > > > > > Q: > > > > > > 1. The inflight commit should be turned to roll back status in > > next > > > > > > writes. Is it normal that so many inflight commit did not make > it? > > Or > > > > > can I > > > > > > config a retain policy to turn inflight to roll_back in another > > way? > > > > > > 2. Did commit retain policy do not archive inflight commit? > > > > > > > > > > > > 2019-04-23 20:23:47 378 > 20190423122339.deltacommit.inflight > > > > > > > > > > > > 2019-04-23 20:43:53 378 > 20190423124343.deltacommit.inflight > > > > > > > > > > > > 2019-04-23 22:14:04 378 > 20190423141354.deltacommit.inflight > > > > > > > > > > > > 2019-04-23 22:44:09 378 > 20190423144400.deltacommit.inflight > > > > > > > > > > > > 2019-04-23 22:54:18 378 > 20190423145408.deltacommit.inflight > > > > > > > > > > > > 2019-04-23 23:04:09 378 > 20190423150400.deltacommit.inflight > > > > > > > > > > > > 2019-04-23 23:24:30 378 > 20190423152421.deltacommit.inflight > > > > > > > > > > > > *2019-04-23 23:44:34 378 > > 20190423154424.deltacommit.inflight* > > > > > > > > > > > > *2019-04-24 00:15:46 2991 20190423161431.clean* > > > > > > > > > > > > 2019-04-24 00:15:21 870536 20190423161431.deltacommit > > > > > > > > > > > > 2019-04-24 00:25:19 2991 20190423162424.clean > > > > > > > > > > > > 2019-04-24 00:25:09 875825 20190423162424.deltacommit > > > > > > > > > > > > 2019-04-24 00:35:26 2991 20190423163429.clean > > > > > > > > > > > > 2019-04-24 00:35:18 881925 20190423163429.deltacommit > > > > > > > > > > > > 2019-04-24 00:46:14 2991 20190423164428.clean > > > > > > > > > > > > 2019-04-24 00:45:44 888025 20190423164428.deltacommit > > > > > > > > > > > > Thanks, > > > > > > Jun > > > > > > > > > > > > On 2019/04/18 14:29:23, Vinoth Chandar <[email protected]> wrote: > > > > > > > Hi Jun,> > > > > > > > > > > > > > > Responses below.> > > > > > > > > > > > > > > >>1. Some file inflight may never reach commit?> > > > > > > > yes. the next attempt at writing will first issue a rollback to > > > clean > > > > > up> > > > > > > > such partial/leftover files first, before it begins the new > > > commit.> > > > > > > > > > > > > > > >>2. In occasion which inflight and parquet file generated by > > > > inflight > > > > > > still> > > > > > > > exists, the global dedup will not dedup based on such kind > > file?> > > > > > > > even if not rolled back, we check for the inflight parquet > files > > > > > against> > > > > > > > the committed timeline, which it wont be a part of. So should > be > > > > safe.> > > > > > > > > > > > > > > > > > > > > > >>3. In occasion which inflight and parquet file generated by > > > > inflight > > > > > > still> > > > > > > > exists, the correct query result will be decided by read > > config(I> > > > > > > > mean mapreduce.input.pathFilter.class> > > > > > > > in sparksql)> > > > > > > > yes. the filtering should work as well. its the same technique > > used > > > > by> > > > > > > > writer.> > > > > > > > > > > > > > > > > > > > > > >>4. Is there any way we can use> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",> > > > > > > > > > > > > > > classOf[com.uber.hoodie.hadoop.HoodieROTablePathFilter],> > > > > > > > > classOf[org.apache.hadoop.fs.PathFilter]);> > > > > > > > > > > > > > > in spark thrift server when start it?> > > > > > > > > > > > > > > I am not familiar with the Spark thrift server myself. Any > > pointers > > > > > where > > > > > > I> > > > > > > > can learn more?> > > > > > > > Two suggestions :> > > > > > > > - You can check if you can add this to the Hadoop configuration > > xml > > > > > > files> > > > > > > > and see if it gets picked up by Spark?> > > > > > > > - Alternatively, you can set the spark config mentioned here> > > > > > > > http://hudi.apache.org/querying_data.html#spark-rt-view (works > > for > > > > ro > > > > > > view> > > > > > > > also), which should be doable I am assuming at this thrift > > server> > > > > > > > > > > > > > > > > > > > > > Thanks> > > > > > > > Vinoth> > > > > > > > > > > > > > > > > > > > > > On Wed, Apr 17, 2019 at 12:08 AM Jun Zhu > > <[email protected] > > > > > > > > > > wrote:> > > > > > > > > > > > > > > > Hi,> > > > > > > > > Link: https://github.com/apache/incubator-hudi/issues/639> > > > > > > > > Sorry , failed open > > > > > > https://lists.apache.org/[email protected]> > > > > > > > > .> > > > > > > > > I have some follow up questions for issue 639:> > > > > > > > >> > > > > > > > > So, the sequence of events is . We write parquet files and > then > > > > upon> > > > > > > > > > successful writing of all attempted parquet files, we > > actually > > > > make > > > > > > the> > > > > > > > > > commit as completed. (i.e not inflight anymore). So this is > > > > normal. > > > > > > This> > > > > > > > > is> > > > > > > > > > done to prevent queries from reading partially written > > parquet > > > > > > files..> > > > > > > > > >> > > > > > > > >> > > > > > > > > Does that mean:> > > > > > > > > 1. Some file inflight may never reach commit?> > > > > > > > > 2. In occasion which inflight and parquet file generated by > > > > inflight > > > > > > still> > > > > > > > > exists, the global dedup will not dedup based on such kind > > > file?> > > > > > > > > 3. In occasion which inflight and parquet file generated by > > > > inflight > > > > > > still> > > > > > > > > exists, the correct query result will be decided by read > > > config(I> > > > > > > > > mean mapreduce.input.pathFilter.class> > > > > > > > > in sparksql)> > > > > > > > > 4. Is there any way we can use> > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",> > > > > > > > > > > > > > > > classOf[com.uber.hoodie.hadoop.HoodieROTablePathFilter],> > > > > > > > > > classOf[org.apache.hadoop.fs.PathFilter]);> > > > > > > > >> > > > > > > > > in spark thrift server when start it?> > > > > > > > >> > > > > > > > > Best,> > > > > > > > > --> > > > > > > > > [image: vshapesaqua11553186012.gif] <https://vungle.com/> > > *Jun > > > > > Zhu*> > > > > > > > > Sr. Engineer I, Data> > > > > > > > > +86 18565739171> > > > > > > > >> > > > > > > > > [image: in1552694272.png] < > > > https://www.linkedin.com/company/vungle > > > > >> > > > > > > > > [image:> > > > > > > > > fb1552694203.png] <https://facebook.com/vungle> > [image:> > > > > > > > > tw1552694330.png] <https://twitter.com/vungle> [image:> > > > > > > > > ig1552694392.png] <https://www.instagram.com/vungle>> > > > > > > > > Units 3801, 3804, 38F, C Block, Beijing Yintai Center, > Beijing, > > > > > China> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > [image: vshapesaqua11553186012.gif] <https://vungle.com/> *Jun Zhu* > > > > Sr. Engineer I, Data > > > > +86 18565739171 > > > > > > > > [image: in1552694272.png] <https://www.linkedin.com/company/vungle> > > > > [image: > > > > fb1552694203.png] <https://facebook.com/vungle> [image: > > > > tw1552694330.png] <https://twitter.com/vungle> [image: > > > > ig1552694392.png] <https://www.instagram.com/vungle> > > > > Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China > > > > > > > > > > > > > -- > > [image: vshapesaqua11553186012.gif] <https://vungle.com/> *Jun Zhu* > > Sr. Engineer I, Data > > +86 18565739171 > > > > [image: in1552694272.png] <https://www.linkedin.com/company/vungle> > > [image: > > fb1552694203.png] <https://facebook.com/vungle> [image: > > tw1552694330.png] <https://twitter.com/vungle> [image: > > ig1552694392.png] <https://www.instagram.com/vungle> > > Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China > >
