[
https://issues.apache.org/jira/browse/NIFI-1432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Obaidul Karim updated NIFI-1432:
--------------------------------
Attachment: jstat2.txt
jstat.txt
iostat2.txt
iostat.txt
CSVProcessor_to_HDFS.xml
Please find the attachments.
> Efficient line by line csv processor
> ------------------------------------
>
> Key: NIFI-1432
> URL: https://issues.apache.org/jira/browse/NIFI-1432
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Core Framework
> Affects Versions: 0.4.1
> Environment: Redhat 6.4 (X64)
> Cloudera Hadoop 5.4.2
> Nidi 0.4.1
> Reporter: Obaidul Karim
> Labels: extensions, feature, performance
> Attachments: CSVProcessor_to_HDFS.xml, iostat.txt, iostat2.txt,
> jstat.txt, jstat2.txt
>
> Original Estimate: 672h
> Remaining Estimate: 672h
>
> Hi,
> I was planning to design ETL flow for Hadoop. While doin that I feel the need
> of an efficient line by line csv processor.
> Plese check below for details:
> Requirements:
> -------------------
> 1. Source is plain ASCII files(csv with row & columns). It is comma seperated
> and some of the columns are double quoted(")
> 2. Files are being pushed to a local directory of a machine where NiFi
> installed
> 3. We want to manipulate some of the columns(like masking) before we load
> data in HDFS. Bunisedd requirement is anything loading in Hadoop should be
> masked.
> 4. There will be 5-6 TB data per day and each files size will be 1-2GB in size
> Implemented Solution:
> -----------------------------
> With the above requirements in mind we have designed below flow on NiFi:
> ExecuteProcess(touch and mv to input dir) > ListFile (1 thread) > FetchFile
> (1 thread) > CSVProcessor(4 threads) > PutHDFS (1 thread)
> * "CSVProcessor" is a custom processor. It uses opencsv to parse csv and
> identify columns.
> * I have added some business logic in "CSVProcessor", like masking specific
> columns
> * used 4 threads for "CSVProcessor" and 1 for other because I found it is the
> slowest component.
> Outcome:
> -------------
> 1. With the flow above, I was able to load 110GB files in 90 minutes.
> 2. CSVProcessor with single thread can process 1GB files in about 4 minutes.
> Which is really slow. Need some improvement here.
> Observations:
> ------------------
> In order to check slowness with CSVProcessor we followed below steps:
> 1. Initially we tried above flow with below default heap size (in file
> conf/bootstrap.conf)
> java.arg.2=-Xms512m
> java.arg.3=-Xmx512m
> With this configuration we check below:
> - "jstat -gcutil <PID of NiFI found from jps> 1000"
> - "iostat xmh 1"
> [check attached iostat.txt & jstat.txt]
> We have found garbage collection process is slower due to undersized java
> heap. CPU & I/O have no issues.
> 2. Then we heap size as below:
> java.arg.2=-Xms5120m
> java.arg.3=-Xmx10240m
> And check output of jstat & iostat again.This time no problem found on heap
> size, I/O or CPU. [check attached iostat2.txt & jstat2.txt]
> However, still this processor (CSVProcessor) is slow as usual. Almost no
> improvement on slowness.
> For details on it please go through Nifi users mail list mail:
> http://apache-nifi.1125220.n5.nabble.com/Data-Ingestion-forLarge-Source-Files-and-Masking-td2535.html
> Proposal:
> -----------
> - Please find the attached NiFi flow template [CSVProcessor_to_HDFS.xml]
> - Please find my CSVProcessor code from github
> [https://github.com/obaidcuet/apache-nifi/tree/master/csvprocessor]
> - Please find the sample csv file [it is not the actual file, I manually
> regenerated this by copying same line]
> Proposing for a faster csv processor with below requirements:
> a. It can read any ASCII/UTF-8 csv files and identify columns
> b. It should be typically able to parse process at least 2GB in a minute
> c. There should be pluggable functionality. Example I want to mask some
> specific column in position [0,5 & 7] with my already built jar makser.jar.
> d. You are wellcome to reuse/modify my code [github link above]
> e. In order to make it faster we may use some in memory batch processing.
> Example: we load each file in some in memory storage and batch update on
> specific columns to meet he business requirements.
> Regards,
> Obaid
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)