[ 
https://issues.apache.org/jira/browse/NIFI-1432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Obaidul Karim updated NIFI-1432:
--------------------------------
    Attachment: sample2g.csv.tar.gz

> Efficient line by line csv processor
> ------------------------------------
>
>                 Key: NIFI-1432
>                 URL: https://issues.apache.org/jira/browse/NIFI-1432
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>    Affects Versions: 0.4.1
>         Environment: Redhat 6.4 (X64)
> Cloudera Hadoop 5.4.2
> Nidi 0.4.1
>            Reporter: Obaidul Karim
>              Labels: extensions, feature, performance
>         Attachments: CSVProcessor_to_HDFS.xml, iostat.txt, iostat2.txt, 
> jstat.txt, jstat2.txt, sample2g.csv.tar.gz
>
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> Hi,
> I was planning to design ETL flow for Hadoop. While doin that I feel the need 
> of an efficient line by line csv processor.
> Plese check below for details:
> Requirements:
> -------------------
> 1. Source is plain ASCII files(csv with row & columns). It is comma seperated 
> and some of the columns are double quoted(")
> 2. Files are being pushed to a local directory of a machine where NiFi 
> installed
> 3. We want to manipulate some of the columns(like masking) before we load 
> data in HDFS. Bunisedd requirement is anything loading in Hadoop should be 
> masked.
> 4. There will be 5-6 TB data per day and each files size will be 1-2GB in size
> Implemented Solution:
> -----------------------------
> With the above requirements in mind we have designed below flow on NiFi:
> ExecuteProcess(touch and mv to input dir) > ListFile (1 thread) > FetchFile 
> (1 thread) > CSVProcessor(4 threads) > PutHDFS (1 thread)
> * "CSVProcessor" is a custom processor. It uses opencsv to parse csv and 
> identify columns.
> * I have added some business logic in "CSVProcessor", like masking specific 
> columns
> * used 4 threads for "CSVProcessor" and 1 for other because I found it is the 
> slowest component.
> Outcome:
> -------------
> 1. With the flow above, I was able to load 110GB files in 90 minutes.
> 2. CSVProcessor with single thread can process 1GB files in about 4 minutes. 
> Which is really slow. Need some improvement here.
> Observations:
> ------------------
> In order to check slowness with CSVProcessor we followed below steps:
> 1. Initially we tried above flow with below default heap size (in file 
> conf/bootstrap.conf)
> java.arg.2=-Xms512m
> java.arg.3=-Xmx512m
> With this configuration we check below:
> - "jstat -gcutil <PID of NiFI found from jps> 1000" 
> - "iostat xmh 1"
> [check attached iostat.txt & jstat.txt]
> We have found garbage collection process is slower due to undersized java 
> heap. CPU & I/O have no issues.
> 2. Then we heap size as below:
> java.arg.2=-Xms5120m
> java.arg.3=-Xmx10240m
> And check output of jstat & iostat again.This time no problem found on heap 
> size, I/O or CPU. [check attached iostat2.txt & jstat2.txt]
> However, still this processor (CSVProcessor) is slow as usual. Almost no 
> improvement on slowness.
> For details on it please go through Nifi users mail list mail: 
> http://apache-nifi.1125220.n5.nabble.com/Data-Ingestion-forLarge-Source-Files-and-Masking-td2535.html
> Proposal:
> -----------
> - Please find the attached NiFi flow template [CSVProcessor_to_HDFS.xml]
> - Please find my CSVProcessor code from github 
> [https://github.com/obaidcuet/apache-nifi/tree/master/csvprocessor]
> - Please find the sample csv file [it is not the actual file, I manually 
> regenerated this by copying same line]
> Proposing for a faster csv processor with below requirements:
> a. It can read any ASCII/UTF-8 csv files and identify columns
> b. It should be typically able to parse process at least 2GB in a minute
> c. There should be pluggable functionality. Example I want to mask some 
> specific column in position [0,5 & 7] with my already built jar makser.jar.  
> d. You are wellcome to reuse/modify my code [github link above]
> e. In order to make it faster we may use some in memory batch processing. 
> Example: we load each file in some in memory storage and batch update on 
> specific columns to meet he business requirements.
> Regards,
> Obaid



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to