[
https://issues.apache.org/jira/browse/NIFI-1645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206718#comment-15206718
]
ASF GitHub Bot commented on NIFI-1645:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/295#discussion_r57019067
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.util.Random;
+
+import kafka.producer.Partitioner;
+import kafka.utils.VerifiableProperties;
+
+/**
+ * Collection of implementation of common Kafka {@link Partitioner}s.
+ */
+final public class Partitioners {
+
+ private Partitioners() {
+ }
+ /**
+ * {@link Partitioner} that implements 'round-robin' mechanism which
evenly
+ * distributes load between all available partitions.
+ */
+ public static class RoundRobinPartitioner implements Partitioner {
+ private volatile int index;
+
+ public RoundRobinPartitioner(VerifiableProperties props) {
+ }
+
+ @Override
+ public int partition(Object key, int numberOfPartitions) {
+ int partitionIndex = this.next(numberOfPartitions);
+ return partitionIndex;
+ }
+
+ private int next(int numberOfPartitions) {
+ if (index == numberOfPartitions) {
+ index = 0;
+ }
+ int indexToReturn = index++;
+ return indexToReturn;
+ }
+ }
+
+ /**
+ * {@link Partitioner} that implements 'random' mechanism which
randomly
+ * distributes the load between all available partitions.
+ */
+ public static class RandomPartitioner implements Partitioner {
+ private final Random random;
+
+ public RandomPartitioner(VerifiableProperties props) {
+ this.random = new Random();
+ }
+
+ @Override
+ public int partition(Object key, int numberOfPartitions) {
+ return this.random.nextInt(numberOfPartitions);
+ }
+ }
+
+ /**
+ * {@link Partitioner} that implements 'key hash' mechanism which
+ * distributes the load between all available partitions based on
hashing
+ * the value of the key.
+ */
+ public static class HashPartitioner implements Partitioner {
+ public HashPartitioner(VerifiableProperties props) {
+ }
+
+ @Override
+ public int partition(Object key, int numberOfPartitions) {
+ return (key.hashCode() & Integer.MAX_VALUE) %
numberOfPartitions;
--- End diff --
What is the logic here behind `& Integer.MAX_VALUE`? Since `hashCode()`
will return an int, doing a bit-wise and with `Integer.MAX_VALUE` would just
return the same result as `hashCode()` returned, no?
Also, `key` could be null. Is this going to cause a problem?
> When using delimited data feature PutKafka ack'd ranges feature can break
> -------------------------------------------------------------------------
>
> Key: NIFI-1645
> URL: https://issues.apache.org/jira/browse/NIFI-1645
> Project: Apache NiFi
> Issue Type: Bug
> Reporter: Oleg Zhurakousky
> Assignee: Oleg Zhurakousky
> Fix For: 0.7.0
>
>
> When using the delimited lines feature to send data to Kafka such that a
> large set of lines that appear to be one 'flowfile' in NiFi is sent as a
> series of 1..N messages in Kafka the mechanism of asynchronous
> acknowledgement can break down whereby we will receive acknowledgements but
> be unable to act on them appropriately because by then the session/data would
> have already been considered successfully transferred. This could in
> certain/specific conditions mean failed acknowledgements would not result in
> a retransfer.
> The logic this processor supports for creating child objects to address
> failed/partial segments is extremely complicated and should likely be
> rewritten to be greatly simplified. Instead the SplitText feature should be
> used to create more manageable chunks of data over which if any segment is
> ack'd as a failure then the whole thing is failed and thus can be
> retransmitted. Always best to enable the user to prefer data loss or data
> duplication on their own terms.
> Below is the relevant stack trace
> {code}
> 17:12:37 EDTERROR6162d00f-737f-3710-85f9-318c886af95f
> clpen0004.foo.com:8090PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f]
> PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] failed to process session
> due to java.lang.IllegalStateException:
> java.util.concurrent.ExecutionException:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1458158883054-93724,
> container=cont2, section=540], offset=756882,
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in
> this session (StandardProcessSession[id=97534]):
> java.lang.IllegalStateException: java.util.concurrent.ExecutionException:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1458158883054-93724,
> container=cont2, section=540], offset=756882,
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in
> this session (StandardProcessSession[id=97534])
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)