Hello, In general you probably want to take a look at the "record" processors which will offer a more efficient way of performing this task without needing to split to 1 message per flow file.
The flow with the record processors would probably be GetFile -> ConvertRecord (using CsvReader and AvroWriter) -> PublishKafkaRecord Regarding your specific questions... 1) All split processors write a standard set of "fragment" attributes which you can read about in the documentation of the processor. The fragment.identifier will be a unique id for the overall flow file and then fragment.index will be the index of the split with in the given fragment.identifier. 2) I think you will need to write a custom script or processor for this validation part. I suppose there could be a generic ValidateFieldLength processor, but it doesn't seem like a common case, and it only applies to fields that are strings which is a small sub-set of the possible types. -Bryan On Mon, Jun 18, 2018 at 12:41 AM, Dave <[email protected]> wrote: > Hi, > > I am learning NiFi. > > I have created an input csv (CityCode.csv) file as below: > ID, CITY_NAME, ZIP_CD, STATE_CD > 1, Delhi, 110001, DL > 2, Mumbai, 400001, MH > 3, Chennai, 600001, TN > 4, Bangalore, 560001, KA > > This is my 1st dataflow. I am building it block by block and I am planning > to create a dataflow like this. > GetFile -> InferAvroSchema -> SplitText -> ConvertCSVToAvro -> ExtractText > -> if error Put in Kafka > > -> if success put in DB > I might add few more functionalities in between to strengthen my knowledge. > > InitialFlow.jpg > <http://apache-nifi-developer-list.39713.n7.nabble.com/file/t1006/InitialFlow.jpg> > > I have created dataflow till ConvertCSVToAvro. I have a few queries in the > flow till now > > I use Getfile processor to take a csv file from a directory > D:\ApacheNiFi\source-data. If getfile is successful, then the flow moves to > “CreateInferAvroSchema” > In InferAvroSchema processor, the flow is configured as below: > > • Schema Output Destination - flowfile-attribute > • Input Content Type - CSV > • CSV Header Definition - > • Get CSV Header Definition From Data - true > • CSV Header Line Skip Count – 1 > • CSV delimiter – . > • CSV Escape String - / > • CSV Quote String – ‘ > • Pretty Avro Output - true > • Avro Record Name - CityCode > • Numer of Records To Analyze - 10 > • Charset – UTF8 > > Scheduling > Scheduling Strategy - Timer Driven, Concurrent Tasks – 1, Run Schedule – 0 > sec > Settings > • I have checked original Relationship to Automatically Terminate > Relationships because I am not able to understand what exactly is this > relationship > • Failure & Unsupported content – Put in file in directory > “D:\ApacheNiFi\error-data” > • Success – SplitText > > The reason why I used SplitText processor before InferAvroSchema processor > is that the schema processor is not able to capture records which are only > failure but send the whole file and add an attribute “error” to failed > records. In one specific post, it was recommended to first split the records > and then convert to avro > https://stackoverflow.com/questions/41840726/nifi-convertcsvtoavro-how-to-capture-the-failed-records > <https://stackoverflow.com/questions/41840726/nifi-convertcsvtoavro-how-to-capture-the-failed-records> > > In SplitText Processor, the flow is configured as below: > Line Split Count - 1 > Header Line Count - 1 (This I have kept as 1 because I have a header > in my file) > Remove Trailing Newlines - true > > Splits - It flows to next processor “ConvertCSVToAvro” > Original - I have created a processor Putfile and storing the file in a > directory by name "D:\ApacheNiFi\processed-data". > Failure - I am routing it to the same processor > > 1st question: > Is it possible that we can attach some kind of an attribute to distinguish > every record that is split. For eg. Is it possible to attach some unique ID > to each record as an attribute to make it unique? If yes, how can I do that? > Is there any instructions or material available where it will help me to add > an attribute? I tried to add “UpdateAttribute” processor to check if I can > achieve this, but could not find anything related. > > 2nd question: > I also need to check if the input string in each field of the record is of > 35 characters. Only then it should execute the “Split” relation. Else the > record should be routed to failure. > > Any guidance will be very helpful. I hope I am not sounding very stupid. > > If there is any material for me to practise these kind of activities like > validating based on some conditions or mentioning a filename for capturing > error records like "InvalidRecords.csv" in the folder mentioned in putfile > processor. Everything seems so confusing and I am not able to find enough > material to learn this. > > Thanks for your patience and time > > Thanks > Dave > > > > -- > Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
