Junegunn Choi created NIFI-5064:
-----------------------------------
Summary: Fixes and improvements to PutKudu processor
Key: NIFI-5064
URL: https://issues.apache.org/jira/browse/NIFI-5064
Project: Apache NiFi
Issue Type: Improvement
Affects Versions: 1.6.0
Reporter: Junegunn Choi
1. Currently, PutKudu fails with NPE on null or missing values.
2. {{IllegalArgumentException}} on 16-bit integer columns because of [a missing
{{break}} in case clause for INT16
columns|https://github.com/apache/nifi/blob/rel/nifi-1.6.0/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java#L112-L115].
3. Also, {{IllegalArgumentException}} on 8-bit integer columns. We need a
separate case clause for INT8 columns where {{PartialRow#addByte}} instead of
{{PartialRow#addShort}} is be used.
4. NIFI-4384 added batch size parameter, however, it only applies to FlowFiles
with multiple records. {{KuduSession}} is created and closed for each FlowFile,
so if a FlowFile contains only a single record, no batching takes place. A
workaround would be to use a preprocessor to concatenate multiple FlowFiles,
but since {{PutHBase}} and {{PutSQL}} use {{session.get(batchSize)}} to handle
multiple FlowFiles at once, I think we can take the same approach here with
PutKudu as it simplifies the data flow.
5. {{PutKudu}} depends on kudu-client 1.3.0. But we can safely update to 1.7.0.
- [https://github.com/apache/kudu/blob/1.7.0/docs/release_notes.adoc]
- [https://github.com/apache/kudu/blob/1.7.0/docs/prior_release_notes.adoc]
A notable change in Kudu 1.7.0 is the addition of Decimal type.
6. {{PutKudu}} has {{Skip head line}} property for ignoring the first record in
a FlowFile. I suppose this was added to handle header lines in CSV files, but I
really don't think it's something {{PutKudu}} should handle. {{CSVReader}}
already has {{Treat First Line as Header}} option, so we should tell the users
to use it instead as we don't want to have the same option here and there.
Also, the default value of {{Skip head line}} is {{true}}, and I found it very
confusing as my use case was to stream-process single-record FlowFiles. We can
keep this property for backward compatibility, but we should at least deprecate
it and change the default value to {{false}}.
7. Server-side errors such as uniqueness constraint violation are not checked
and simply ignored. When flush mode is set to {{AUTO_FLUSH_SYNC}}, we should
check the return value of {{KuduSession#apply}} to see it has {{RowError}}, but
PutKudu currently ignores it. For example, on uniqueness constraint violation,
we get a {{RowError}} saying "_Already present: key already present (error 0)_".
On the other hand, when flush mode is set to {{AUTO_FLUSH_BACKGROUND}},
{{KuduSession#apply}}, understandably, returns null, and we should check the
return value of {{KuduSession#getPendingErrors()}}. And when the mode is
{{MANUAL_FLUSH}}, we should examine the return value of {{KuduSession#flush()}}
or {{KuduSession#close()}}. In this case, we also have to make sure that we
don't overflow the mutation buffer of {{KuduSession}} by calling {{flush()}}
before too late.
----
I'll create a pull request on GitHub. Since there are multiple issues to be
addressed. I made separate commits for each issue mentioned above so that it's
easier to review. You might want to squash them into one, or cherry-pick a
subset of commits if you don't agree with some decisions I made.
Please let me know what you think. We deployed the code to a production server
last week and it's been running since without any issues steadily processing
20K records/second.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)