[ 
https://issues.apache.org/jira/browse/NIFI-4026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496981#comment-16496981
 ] 

Mark Payne commented on NIFI-4026:
----------------------------------

[~pvillard] thanks for bringing this up - it's a nice feature to have. I do 
think we need to be careful about a few things, though:

I would not try to allow for any sort of scripting language to be used for a 
hashing function. Instead, we should just do something like 
`attributeOfInterest.hashCode() % numberOfNodes`. If scripting is needed, users 
could use ExecuteScript or something similar to generate the value to use for 
that 'attribute of interest'. We certainly do not want to add any dependencies 
into the framework for scripting languages, and even the user experience around 
providing that as an option, I think, complicates things more than it needs to.

I do not know that this would necessarily result in a batch of 1 - we could 
certainly create multiple transactions for multiple destination nodes.

Regarding node scaling: I am okay, at least initially, saying that scaling up 
or down may result in 'rebalancing' the calculation of which node belongs to 
which hash values, as long as this is well documented. However, a case that I 
do think is important to solve and cannot be ignored is the case of nodes 
connecting and disconnecting. This occurs much more frequently than nodes being 
added to / removed from a cluster. To handle this properly, I think the 
site-to-site protocol would have to be updated so that when a client asks for 
the nodes in a cluster, the response would need to include all nodes, 
regardless of their Connection Status, but would also need to include the 
nodes' Connection Statuses so that the client doesn't try to send data to a 
disconnected node. But by including Connecting, Disconnecting, and Disconnected 
nodes, we can avoid the case of sending data to the wrong destination just 
because a node in the destination cluster temporarily disconnected.

Additionally, I think it should also be a requirement that in order to do this, 
when a node is notified of a change to the cluster topology, instead of just 
updating an in-memory representation of the cluster topology, the node will 
need to persist this information. This is needed to address the case where the 
destination cluster is restarted. If the destination cluster has, say 10 nodes, 
and they are restarted, then a client may request the cluster topology and find 
that there are only 2 nodes in the cluster. Currently, that's okay, it will 
send data to those 2 nodes. And a minute or so later realize there are now 6 
nodes. And a minute later, 8 nodes, and finally 10 nodes. However, this will 
cause a lot of problems for a use case where we actually need to know the size 
of the whole cluster. So if we persist this information on the nodes, then even 
upon restart we can know how many nodes to expect in the cluster.

Finally, one implementation detail that I think is worth noting here - once we 
know the nodes in the cluster, it's important that we sort those nodes by their 
Node Identifiers, before trying to hash an attribute to determine which node to 
send the data to. This will ensure that even if the client is restarted, we are 
able to tie a hash value to the correct node (as opposed to just using a 
List<Node> that could be populated in a different order each time that we 
determine the nodes in a cluster).

> SiteToSite Partitioning
> -----------------------
>
>                 Key: NIFI-4026
>                 URL: https://issues.apache.org/jira/browse/NIFI-4026
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Pierre Villard
>            Priority: Major
>
> To answer some use cases and to always provide more flexibility to the 
> Site-to-Site mechanism it would be interesting to introduce a S2S 
> Partitioning Key.
> The idea would be to add a parameter in the S2S configuration to compute the 
> destination node based on the attribute of a flow file. The user would set 
> the attribute to read from the incoming flow files and a hashing function 
> would be applied on this attribute value to get a number between 1 and N (N 
> being the number of nodes on the remote cluster) to select the destination 
> node.
> It could even be possible to let the user code a custom hashing function in a 
> scripting language.
> This approach would potentially force the “batching” to 1, or it could be 
> necessary to create bins to batch together flow files that are supposed to go 
> to the same node.
> Obviously, it comes the question regarding how to handle cluster scale 
> up/down. However, I believe this is an edge case and should not be blocking 
> this feature.
> Some of the use cases could be:
> - better load balancing of the flow files when using the List/Fetch pattern 
> (example: ListHDFS/FetchHDFS and load balance based on the size of the remote 
> file to fetch)
> - being able to keep on the same node the data related to the same element 
> (based on business requirements, example: all the logs from a given host 
> should be merged in the same file and not have one file per NiFi node)
> - give the possibility to send all the data back to the primary node (we 
> could say that if the hash function returns 0, then the destination node is 
> the primary node) in case this is required for specific operations. This 
> would avoid the need to do the full workflow on the primary node only when 
> some parts can be load balanced.
> I also think that this work would be a good foundation for the "node 
> labeling" stuff that has been discussed on the mailing lists in the past.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to