Junegunn Choi created NIFI-5064:

             Summary: Fixes and improvements to PutKudu processor
                 Key: NIFI-5064
                 URL: https://issues.apache.org/jira/browse/NIFI-5064
             Project: Apache NiFi
          Issue Type: Improvement
    Affects Versions: 1.6.0
            Reporter: Junegunn Choi

1. Currently, PutKudu fails with NPE on null or missing values.

2. {{IllegalArgumentException}} on 16-bit integer columns because of [a missing 
{{break}} in case clause for INT16 

3. Also, {{IllegalArgumentException}} on 8-bit integer columns. We need a 
separate case clause for INT8 columns where {{PartialRow#addByte}} instead of 
{{PartialRow#addShort}} is be used.

4. NIFI-4384 added batch size parameter, however, it only applies to FlowFiles 
with multiple records. {{KuduSession}} is created and closed for each FlowFile, 
so if a FlowFile contains only a single record, no batching takes place. A 
workaround would be to use a preprocessor to concatenate multiple FlowFiles, 
but since {{PutHBase}} and {{PutSQL}} use {{session.get(batchSize)}} to handle 
multiple FlowFiles at once, I think we can take the same approach here with 
PutKudu as it simplifies the data flow.

5. {{PutKudu}} depends on kudu-client 1.3.0. But we can safely update to 1.7.0.
 - [https://github.com/apache/kudu/blob/1.7.0/docs/release_notes.adoc]
 - [https://github.com/apache/kudu/blob/1.7.0/docs/prior_release_notes.adoc]

A notable change in Kudu 1.7.0 is the addition of Decimal type.

6. {{PutKudu}} has {{Skip head line}} property for ignoring the first record in 
a FlowFile. I suppose this was added to handle header lines in CSV files, but I 
really don't think it's something {{PutKudu}} should handle. {{CSVReader}} 
already has {{Treat First Line as Header}} option, so we should tell the users 
to use it instead as we don't want to have the same option here and there. 
Also, the default value of {{Skip head line}} is {{true}}, and I found it very 
confusing as my use case was to stream-process single-record FlowFiles. We can 
keep this property for backward compatibility, but we should at least deprecate 
it and change the default value to {{false}}.

7. Server-side errors such as uniqueness constraint violation are not checked 
and simply ignored. When flush mode is set to {{AUTO_FLUSH_SYNC}}, we should 
check the return value of {{KuduSession#apply}} to see it has {{RowError}}, but 
PutKudu currently ignores it. For example, on uniqueness constraint violation, 
we get a {{RowError}} saying "_Already present: key already present (error 0)_".

On the other hand, when flush mode is set to {{AUTO_FLUSH_BACKGROUND}}, 
{{KuduSession#apply}}, understandably, returns null, and we should check the 
return value of {{KuduSession#getPendingErrors()}}. And when the mode is 
{{MANUAL_FLUSH}}, we should examine the return value of {{KuduSession#flush()}} 
or {{KuduSession#close()}}. In this case, we also have to make sure that we 
don't overflow the mutation buffer of {{KuduSession}} by calling {{flush()}} 
before too late.
I'll create a pull request on GitHub. Since there are multiple issues to be 
addressed. I made separate commits for each issue mentioned above so that it's 
easier to review. You might want to squash them into one, or cherry-pick a 
subset of commits if you don't agree with some decisions I made.

Please let me know what you think. We deployed the code to a production server 
last week and it's been running since without any issues steadily processing 
20K records/second.

This message was sent by Atlassian JIRA

Reply via email to