Hi Vinoth,

Context
The application I wrote is generic to tackle loads of csv or other data files 
lying around in the organizations. Take the case of a typical stock trading 
information contained in these files. The files have say customer_id as primary 
key and timestamp of transaction. As one can imagine, the issue with loading 
such files is quite evident. There are 2 options:
Developer ensures that files are uploaded in order of timeline OR

Write intelligent code in HUDI that can reject outdated updates of records.

I have taken the second approach by implementing the routine below to my 
requirement.

// Routine to implement
public Optional<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRec, 
Schema schema)
This also enables us to not worry about data corruption if application crashes 
and files reload process has to be restarted.

Application
The first cut of the application was written in such a way that Spark read a 
folder of files and then processed each file separately. After processing 30 
files S3 connection limit would breach and I thought I will get away with it if 
I implemented one spark-submit per file. This did help to process files until 2 
years worth of daily partitioned data was loaded.
Each file is split and 2 distinct data frames are written in 2 distinct tables. 
After 2+ years of processed data, each of df.write would create 750+ 
connections and breach 1500 S3 connections limit. This literally made HUDI 
impossible to use for any dataset that could grow to a timeline of 3 years.
I have now increased the S3 connection limit to 5000 with a view of finding 
root cause and fixing it with your help. This is just a time buying exercise 
but this is not something that will scale.
When I said 3 levels down - the table is partitioned by date eg: 2019/03/01 
under the base path. Each call to such partitions, will result in distinct file 
i.e. creates a single S3 connection.
Further Steps
I think it should be possible to replicate this scenario in Cloudera HDFS 
cluster. I have put a snapshot of one such chart from the Cloudera library that 
can be monitored before during and after closing application to see how many 
HDFS connections are being opened.
Thank you for all the responses.
Kabeer.

On Mar 25 2019, at 11:43 pm, Vinoth Chandar <[email protected]> wrote:
> Hi Kabeer,
>
> Actually knowing things like S3 creates connections three level down helps
> us immensely already! :) We typically test with HDFS which just talks to
> namenode for RPC.
>
> For context, are you running hudi in a streaming job that does not exit?
> Thanks
> Vinoth
>
> On Mon, Mar 25, 2019 at 2:58 PM Kabeer Ahmed <[email protected]> wrote:
> > Thank you Vinoth and Balaji. As soon as you have a patch, I can get the
> > tests going on S3 and relay back the results.
> >
> > Vinoth:
> > My proficiency with code can never be any better than you and Balaji. I
> > have never looked into the translation of fs calls (getPartitions()) into
> > depth if they are translated into filesystem calls.
> >
> > But as far as my understanding wrt S3, even a simple get partition if
> > going 3 levels down to retrieve the objects also results in a S3
> > connection. Let me give a gist what we do in the pseudo-code below.
> >
> > insertHudiRecords()
> > {
> > // Prepare 1st HUDI DF to write
> > df1.write.format(com.uber.hudi).... // S3 connections increase by 750
> > inline with # of partitions we have.
> >
> > // Prepare another HUDI DF to write.
> > df2.write.format(com.uber.hudi)... // S3 connections are at 750 level &
> > another 750 are added UP
> >
> > }
> > // S3 connections released only when Spark process in above routine is
> > finished i.e. actual application exits()
> > Thank you for all your responses.
> > On Mar 25 2019, at 4:15 pm, [email protected] wrote:
> > > +1, Incremental cleaning is a scheduled work. I will be working on this
> >
> > immediately after the HUDI-1
> > > Balaji.V On Sunday, March 24, 2019, 7:42:03 PM PDT, Vinoth Chandar <
> >
> > [email protected]> wrote:
> > >
> > > Hi Kabeer,
> > > You are right. HUDI-1 alone wont be sufficient. We need to do a follow
> >
> > on.
> > > IIRC this is already planned work (balaji?)
> > > Filed https://issues.apache.org/jira/browse/HUDI-80 to separate this
> >
> > from
> > > HUDI-1..
> > >
> > > On to the issue you are facing, seems like the connections to S3 keep
> > > hanging around? Don't think cleaning actually opens any files, simply
> >
> > lists
> > > and deletes. We could call a fs.close which probably shuts the
> >
> > connections
> > > down. But, need to think through that more, since fs caching is a tricky
> > > issue.. https://issues.apache.org/jira/browse/HUDI-81 filed this
> > > separately to track this. If you can help me track the connections to S3
> > > etc, I can take a stab and may be we can test teh patch in your
> > > environment?
> > >
> > > We can work on the ticket. Please share your jira id, so I can add you as
> > > acontributor, giving you commenting etc on jira
> > >
> > > Thanks
> > > Vinoth
> > >
> > >
> > >
> > > On Sun, Mar 24, 2019 at 2:11 PM Kabeer Ahmed <[email protected]>
> > wrote:
> > > > Hi Vinoth,
> > > > Thank you for your response. I thought of reducing clear parallelism
> > >
> >
> > which
> > > > is Min(200, table_partitions). But it wouldnt have an effect as
> > >
> >
> > regardless
> > > > of parallelism, there will be an attempt to scan all files (reduced
> > > > parallelism might albeit slow the process).
> > > > So as stated in a table with 750+ partitions I did notice that
> > >
> >
> > connections
> > > > would increase and I have now been forced to keep the S3 connection
> > >
> >
> > limit
> > > > to 5k due to this issue.
> > > > I also looked into the brief description of the jira:
> > > > https://issues.apache.org/jira/browse/HUDI-1 (
> > > >
> > https://link.getmailspring.com/link/[email protected]/0?redirect=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FHUDI-1&recipient=ZGV2QGh1ZGkuYXBhY2hlLm9yZw%3D%3D
> > ).
> > > > This is a very nice optimisation to have but I dont think it will help
> > > > alleviate the concerns on the S3. On HDFS, this jira will definitely
> > >
> >
> > help
> > > > reduce the # of name node connections but S3 objects will need to be
> > >
> >
> > opened
> > > > to clear them and the problem will no go away.
> > > > I think the effective work has to be on the lines of working up
> > >
> >
> > cleaning
> > > > the partitions in the routine below:
> > > >
> > > > // File: HoodieCopyOnWriteTable.java
> > > > public List<HoodieCleanStat> clean(JavaSparkContext jsc) {
> > > >
> > > > try {
> > > > FileSystem fs = getMetaClient().getFs();
> > > > List<String> partitionsToClean = FSUtils
> > > > .getAllPartitionPaths(fs, getMetaClient().getBasePath(),
> > > > config.shouldAssumeDatePartitioning());
> > > > logger.info("Partitions to clean up : " + partitionsToClean + ", with
> > > > policy " + config
> > > > .getCleanerPolicy());
> > > > if (partitionsToClean.isEmpty()) {
> > > > logger.info("Nothing to clean here mom. It is already clean");
> > > > return Collections.emptyList();
> > > > }
> > > > return cleanPartitionPaths(partitionsToClean, jsc);
> > > > } catch (IOException e) {
> > > > throw new HoodieIOException("Failed to clean up after commit", e);
> > > > }
> > > > }
> > > > In the above routine, all the connections opened are not closed. I
> > >
> >
> > think
> > > > the work should be on the lines of cleaning the connections in this
> > >
> >
> > routine
> > > > after the cleaning operation (i.e. file close logic added so that it is
> > > > executed in parallel for every file opened by Spark executors).
> > > >
> > > > Please feel free to correct me if you think I have goofed up somewhere.
> > > > Thanks
> > > > Kabeer.
> > > >
> > > > PS: There is so much going on and there is a need to progress with the
> > > > stuff at hand at work. Otherwise would have loved to spend time and
> > >
> >
> > send a
> > > > PR.
> > > > On Mar 24 2019, at 7:04 am, Vinoth Chandar <[email protected]> wrote:
> > > > > Hi Kabeer,
> > > > >
> > > > > No need to apologize :)
> > > > > Mailing list works lot better for reporting issues. We can respond
> > > >
> > >
> >
> > much
> > > > > quicker, since its not buried with all other github events
> > > > >
> > > > > On what you saw, the cleaner does list all partitions currently.
> > Have you
> > > > > tried reducing cleaner parallelism if limiting connections is your
> > > >
> > >
> >
> > goal?
> > > > >
> > > > > Also some good news is, once
> > > > > https://issues.apache.org/jira/browse/HUDI-1 is landed (currently
> > > >
> > >
> >
> > being
> > > > > reviewed), a follow on is to rework the cleaner incrementally on top
> > > >
> > > >
> > > > which
> > > > > should help a lot here.
> > > > >
> > > > >
> > > > > Thanks
> > > > > Vinoth
> > > > >
> > > > > On Sat, Mar 23, 2019 at 7:39 PM Kabeer Ahmed <[email protected]>
> > > > wrote:
> > > > > > Hi,
> > > > > > I have just raised this issue and thought to share with the
> > > > >
> > > >
> > >
> >
> > community
> > > > >
> > > >
> > > > if
> > > > > > someone else is experiencing this. Apologies in advance if this is
> > > > >
> > > >
> > >
> >
> > a
> > > > > > redundant email.
> > > > > > Thanks
> > > > > > Kabeer.
> > > > >
> > > >
> > >
> >
>
>

Reply via email to