Hi Jun,

I was able to track that the HoodieSparkSQLWriter (common path for
streaming sink and batch datasource) ends up calling
DataSourceUtils.createHoodieClient, which creates the client as follows

return new HoodieWriteClient<>(jssc, writeConfig);

There is a third parameter that denotes whether the writer needs to
rollback inflights. For e.g, DeltaStreamer invokes

HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg, true);

While I trace down why we had this difference, could you try changing this
one line here, and add third "true" argument and give it a shot.
https://github.com/apache/incubator-hudi/blob/b34a204a527a156406908686e54484a0c3d8a3d7/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java#L148


Thanks
Vinoth

On Tue, Apr 30, 2019 at 11:16 PM [email protected] <[email protected]>
wrote:

>
> Hi Jun,
> You had mentioned that you are seeing the log message
>  "insert failed with 1 errors"
> Did you see any exception stack traces before this message. You can also
> take a look at spark UI to see if stdout/stderr of failed tasks (if
> present).
> Also, it looks like if you also enable "trace" level logging, you would
> see exceptions getting logged at the end. So, enabling "trace" level
> logging is another way to debug what is happening.
> '''log.error(s"$operation failed with ${errorCount} errors :");
> if (log.isTraceEnabled) {
>   log.trace("Printing out the top 100 errors")     .......
> '''
> Balaji.V
>
>     On Tuesday, April 30, 2019, 8:17:57 AM PDT, Vinoth Chandar <
> [email protected]> wrote:
>
>  Hi Jun,
>
> Basically you are saying streaming path leaves some inflights behind.. let
> me see if I can reproduce it. If you have a simple test case, please share
>
> Thanks
> Vinoth
>
> On Tue, Apr 30, 2019 at 1:04 AM Jun Zhu <[email protected]>
> wrote:
>
> > Hi Vinoth,
> > In spark streaming log I find "2019-04-30 03:26:11 ERROR
> > HoodieSparkSQLWriter:182 - insert failed with 1 errors :"(no continue
> error
> > logs) , during which commit end with inflight and not cleaned.
> > Just for feedback, we can dedup data correctly in batch way. Should add
> > more logic for exception handling if using spark stream I think.
> > Regards,
> > Jun
> >
> >
> > On Tue, Apr 30, 2019 at 2:46 AM Vinoth Chandar <[email protected]>
> wrote:
> >
> > > Another option to try would be setting the
> > > spark.sql.hive.convertMetastoreParquet=false, if you are querying via
> the
> > > Hive table registered by Hudi.
> > >
> > > On Sat, Apr 27, 2019 at 7:02 PM Jun Zhu <[email protected]>
> > > wrote:
> > >
> > > > Thanks for explanation vinoth, code was same list in
> > > > https://github.com/apache/incubator-hudi/issues/639, with setting
> > table
> > > > format to `.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,
> > > > DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)`.
> > > > And the result data was stored on aws s3.
> > > > I will try more on
> > > >
> > > >
> > >
> >
> `spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
> > > > classOf[com.uber.hoodie.hadoop.HoodieROTablePathFilter],
> > > > classOf[org.apache.hadoop.fs.PathFilter]);`  from the phenomenon, the
> > > > config did not take effects maybe.
> > > >
> > > > On Sat, Apr 27, 2019 at 12:09 AM Vinoth Chandar <[email protected]>
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > >>The duplicates was found in inflight commit parquet files.
> > Wondering
> > > if
> > > > > this was expected?
> > > > > Spark shell should not even be reading in-flight parquet files. Can
> > you
> > > > > double check if the spark access is properly configured?
> > > > > http://hudi.apache.org/querying_data.html#spark
> > > > >
> > > > > Inflight should be rolled back at the start of the next
> commit/delta
> > > > > commit.. Not sure why there are so many inflight delta commits.
> > > > > If you can give a reproducible case, happy to debug it more..
> > > > >
> > > > > Only complete instants are archived.. So yes, inflight is not
> > > archived..
> > > > >
> > > > > Hope that helps
> > > > >
> > > > > Thanks
> > > > > Vinoth
> > > > >
> > > > > On Fri, Apr 26, 2019 at 2:09 AM Jun Zhu <[email protected]
> >
> > > > > wrote:
> > > > >
> > > > > > Hi Vinoth,
> > > > > > Some continue question about this thread.
> > > > > > Here is what I found after running a few days:
> > > > > > in .hoodie folder, due to retain policy maybe, there is an
> > obviously
> > > > > > line(list in the end of email). Before it the cleaned commit was
> > > > > archived,
> > > > > > find duplication when query inflight commit correspond partition
> by
> > > > > > spark-shell. After the line, all behave normal, global dedup
> works.
> > > > > > The duplicates was found in inflight commit parquet files.
> > Wondering
> > > if
> > > > > > this was expected?
> > > > > > Q:
> > > > > > 1.  The inflight commit should be turned to roll back status in
> > next
> > > > > > writes. Is it normal that so many inflight commit did not make
> it?
> > Or
> > > > > can I
> > > > > > config a retain policy to turn inflight to roll_back in another
> > way?
> > > > > > 2. Did commit retain policy do not archive inflight commit?
> > > > > >
> > > > > > 2019-04-23 20:23:47        378
> 20190423122339.deltacommit.inflight
> > > > > >
> > > > > > 2019-04-23 20:43:53        378
> 20190423124343.deltacommit.inflight
> > > > > >
> > > > > > 2019-04-23 22:14:04        378
> 20190423141354.deltacommit.inflight
> > > > > >
> > > > > > 2019-04-23 22:44:09        378
> 20190423144400.deltacommit.inflight
> > > > > >
> > > > > > 2019-04-23 22:54:18        378
> 20190423145408.deltacommit.inflight
> > > > > >
> > > > > > 2019-04-23 23:04:09        378
> 20190423150400.deltacommit.inflight
> > > > > >
> > > > > > 2019-04-23 23:24:30        378
> 20190423152421.deltacommit.inflight
> > > > > >
> > > > > > *2019-04-23 23:44:34        378
> > 20190423154424.deltacommit.inflight*
> > > > > >
> > > > > > *2019-04-24 00:15:46      2991 20190423161431.clean*
> > > > > >
> > > > > > 2019-04-24 00:15:21    870536 20190423161431.deltacommit
> > > > > >
> > > > > > 2019-04-24 00:25:19      2991 20190423162424.clean
> > > > > >
> > > > > > 2019-04-24 00:25:09    875825 20190423162424.deltacommit
> > > > > >
> > > > > > 2019-04-24 00:35:26      2991 20190423163429.clean
> > > > > >
> > > > > > 2019-04-24 00:35:18    881925 20190423163429.deltacommit
> > > > > >
> > > > > > 2019-04-24 00:46:14      2991 20190423164428.clean
> > > > > >
> > > > > > 2019-04-24 00:45:44    888025 20190423164428.deltacommit
> > > > > >
> > > > > > Thanks,
> > > > > > Jun
> > > > > >
> > > > > > On 2019/04/18 14:29:23, Vinoth Chandar <[email protected]> wrote:
> > > > > > > Hi Jun,>
> > > > > > >
> > > > > > > Responses below.>
> > > > > > >
> > > > > > > >>1. Some file inflight may never reach commit?>
> > > > > > > yes. the next attempt at writing will first issue a rollback to
> > > clean
> > > > > up>
> > > > > > > such partial/leftover files first, before it begins the new
> > > commit.>
> > > > > > >
> > > > > > > >>2. In occasion which inflight and parquet file generated by
> > > > inflight
> > > > > > still>
> > > > > > > exists,  the global dedup will not dedup based on such kind
> > file?>
> > > > > > > even if not rolled back, we check for the inflight parquet
> files
> > > > > against>
> > > > > > > the committed timeline, which it wont be a part of. So should
> be
> > > > safe.>
> > > > > > >
> > > > > > >
> > > > > > > >>3. In occasion which inflight and parquet file generated by
> > > > inflight
> > > > > > still>
> > > > > > > exists,  the correct query result will be decided by read
> > config(I>
> > > > > > > mean mapreduce.input.pathFilter.class>
> > > > > > > in sparksql)>
> > > > > > > yes. the filtering should work as well. its the same technique
> > used
> > > > by>
> > > > > > > writer.>
> > > > > > >
> > > > > > >
> > > > > > > >>4. Is there any way we can use>
> > > > > > >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",>
> > > > > >
> > > > > > > > classOf[com.uber.hoodie.hadoop.HoodieROTablePathFilter],>
> > > > > > > > classOf[org.apache.hadoop.fs.PathFilter]);>
> > > > > > >
> > > > > > > in spark thrift server when start it?>
> > > > > > >
> > > > > > > I am not familiar with the Spark thrift server myself. Any
> > pointers
> > > > > where
> > > > > > I>
> > > > > > > can learn more?>
> > > > > > > Two suggestions :>
> > > > > > > - You can check if you can add this to the Hadoop configuration
> > xml
> > > > > > files>
> > > > > > > and see if it gets picked up by Spark?>
> > > > > > > - Alternatively, you can set the spark config mentioned here>
> > > > > > > http://hudi.apache.org/querying_data.html#spark-rt-view (works
> > for
> > > > ro
> > > > > > view>
> > > > > > > also), which should be doable I am assuming at this thrift
> > server>
> > > > > > >
> > > > > > >
> > > > > > > Thanks>
> > > > > > > Vinoth>
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Apr 17, 2019 at 12:08 AM Jun Zhu
> > <[email protected]
> > > >
> > > > > > wrote:>
> > > > > > >
> > > > > > > > Hi,>
> > > > > > > > Link: https://github.com/apache/incubator-hudi/issues/639>
> > > > > > > > Sorry , failed open
> > > > > > https://lists.apache.org/[email protected]>
> > > > > > > > .>
> > > > > > > > I have some follow up questions for issue 639:>
> > > > > > > >>
> > > > > > > > So, the sequence of events is . We write parquet files and
> then
> > > > upon>
> > > > > > > > > successful writing of all attempted parquet files, we
> > actually
> > > > make
> > > > > > the>
> > > > > > > > > commit as completed. (i.e not inflight anymore). So this is
> > > > normal.
> > > > > > This>
> > > > > > > > is>
> > > > > > > > > done to prevent queries from reading partially written
> > parquet
> > > > > > files..>
> > > > > > > > >>
> > > > > > > >>
> > > > > > > > Does that mean:>
> > > > > > > > 1. Some file inflight may never reach commit?>
> > > > > > > > 2. In occasion which inflight and parquet file generated by
> > > > inflight
> > > > > > still>
> > > > > > > > exists,  the global dedup will not dedup based on such kind
> > > file?>
> > > > > > > > 3. In occasion which inflight and parquet file generated by
> > > > inflight
> > > > > > still>
> > > > > > > > exists,  the correct query result will be decided by read
> > > config(I>
> > > > > > > > mean mapreduce.input.pathFilter.class>
> > > > > > > > in sparksql)>
> > > > > > > > 4. Is there any way we can use>
> > > > > > > >>
> > > > > > > > >>
> > > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",>
> > > > > >
> > > > > > > > > classOf[com.uber.hoodie.hadoop.HoodieROTablePathFilter],>
> > > > > > > > > classOf[org.apache.hadoop.fs.PathFilter]);>
> > > > > > > >>
> > > > > > > > in spark thrift server when start it?>
> > > > > > > >>
> > > > > > > > Best,>
> > > > > > > > -->
> > > > > > > > [image: vshapesaqua11553186012.gif] <https://vungle.com/>
> >  *Jun
> > > > > Zhu*>
> > > > > > > > Sr. Engineer I, Data>
> > > > > > > > +86 18565739171>
> > > > > > > >>
> > > > > > > > [image: in1552694272.png] <
> > > https://www.linkedin.com/company/vungle
> > > > >>
> > > > > > > > [image:>
> > > > > > > > fb1552694203.png] <https://facebook.com/vungle>
> [image:>
> > > > > > > > tw1552694330.png] <https://twitter.com/vungle>      [image:>
> > > > > > > > ig1552694392.png] <https://www.instagram.com/vungle>>
> > > > > > > > Units 3801, 3804, 38F, C Block, Beijing Yintai Center,
> Beijing,
> > > > > China>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > [image: vshapesaqua11553186012.gif] <https://vungle.com/>  *Jun Zhu*
> > > > Sr. Engineer I, Data
> > > > +86 18565739171
> > > >
> > > > [image: in1552694272.png] <https://www.linkedin.com/company/vungle>
> > > > [image:
> > > > fb1552694203.png] <https://facebook.com/vungle>      [image:
> > > > tw1552694330.png] <https://twitter.com/vungle>      [image:
> > > > ig1552694392.png] <https://www.instagram.com/vungle>
> > > > Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China
> > > >
> > >
> >
> >
> > --
> > [image: vshapesaqua11553186012.gif] <https://vungle.com/>  *Jun Zhu*
> > Sr. Engineer I, Data
> > +86 18565739171
> >
> > [image: in1552694272.png] <https://www.linkedin.com/company/vungle>
> > [image:
> > fb1552694203.png] <https://facebook.com/vungle>      [image:
> > tw1552694330.png] <https://twitter.com/vungle>      [image:
> > ig1552694392.png] <https://www.instagram.com/vungle>
> > Units 3801, 3804, 38F, C Block, Beijing Yintai Center, Beijing, China
> >

Reply via email to