[ 
https://issues.apache.org/jira/browse/NIFI-1645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206718#comment-15206718
 ] 

ASF GitHub Bot commented on NIFI-1645:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/295#discussion_r57019067
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
 ---
    @@ -0,0 +1,87 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka;
    +
    +import java.util.Random;
    +
    +import kafka.producer.Partitioner;
    +import kafka.utils.VerifiableProperties;
    +
    +/**
    + * Collection of implementation of common Kafka {@link Partitioner}s.
    + */
    +final public class Partitioners {
    +
    +    private Partitioners() {
    +    }
    +    /**
    +     * {@link Partitioner} that implements 'round-robin' mechanism which 
evenly
    +     * distributes load between all available partitions.
    +     */
    +    public static class RoundRobinPartitioner implements Partitioner {
    +        private volatile int index;
    +
    +        public RoundRobinPartitioner(VerifiableProperties props) {
    +        }
    +
    +        @Override
    +        public int partition(Object key, int numberOfPartitions) {
    +            int partitionIndex = this.next(numberOfPartitions);
    +            return partitionIndex;
    +        }
    +
    +        private int next(int numberOfPartitions) {
    +            if (index == numberOfPartitions) {
    +                index = 0;
    +            }
    +            int indexToReturn = index++;
    +            return indexToReturn;
    +        }
    +    }
    +
    +    /**
    +     * {@link Partitioner} that implements 'random' mechanism which 
randomly
    +     * distributes the load between all available partitions.
    +     */
    +    public static class RandomPartitioner implements Partitioner {
    +        private final Random random;
    +
    +        public RandomPartitioner(VerifiableProperties props) {
    +            this.random = new Random();
    +        }
    +
    +        @Override
    +        public int partition(Object key, int numberOfPartitions) {
    +            return this.random.nextInt(numberOfPartitions);
    +        }
    +    }
    +
    +    /**
    +     * {@link Partitioner} that implements 'key hash' mechanism which
    +     * distributes the load between all available partitions based on 
hashing
    +     * the value of the key.
    +     */
    +    public static class HashPartitioner implements Partitioner {
    +        public HashPartitioner(VerifiableProperties props) {
    +        }
    +
    +        @Override
    +        public int partition(Object key, int numberOfPartitions) {
    +            return (key.hashCode() & Integer.MAX_VALUE) % 
numberOfPartitions;
    --- End diff --
    
    What is the logic here behind `& Integer.MAX_VALUE`? Since `hashCode()` 
will return an int, doing a bit-wise and with `Integer.MAX_VALUE` would just 
return the same result as `hashCode()` returned, no?
    
    Also, `key` could be null. Is this going to cause a problem?


> When using delimited data feature PutKafka ack'd ranges feature can break
> -------------------------------------------------------------------------
>
>                 Key: NIFI-1645
>                 URL: https://issues.apache.org/jira/browse/NIFI-1645
>             Project: Apache NiFi
>          Issue Type: Bug
>            Reporter: Oleg Zhurakousky
>            Assignee: Oleg Zhurakousky
>             Fix For: 0.7.0
>
>
> When using the delimited lines feature to send data to Kafka such that a 
> large set of lines that appear to be one 'flowfile' in NiFi is sent as a 
> series of 1..N messages in Kafka the mechanism of asynchronous 
> acknowledgement can break down whereby we will receive acknowledgements but 
> be unable to act on them appropriately because by then the session/data would 
> have already been considered successfully transferred.  This could in 
> certain/specific conditions mean failed acknowledgements would not result in 
> a retransfer.
> The logic this processor supports for creating child objects to address 
> failed/partial segments is extremely complicated and should likely be 
> rewritten to be greatly simplified.  Instead the SplitText feature should be 
> used to create more manageable chunks of data over which if any segment is 
> ack'd as a failure then the whole thing is failed and thus can be 
> retransmitted.  Always best to enable the user to prefer data loss or data 
> duplication on their own terms.
> Below is the relevant stack trace
> {code}
> 17:12:37 EDTERROR6162d00f-737f-3710-85f9-318c886af95f
> clpen0004.foo.com:8090PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] 
> PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] failed to process session 
> due to java.lang.IllegalStateException: 
> java.util.concurrent.ExecutionException: 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=1458158883054-93724, 
> container=cont2, section=540], offset=756882, 
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in 
> this session (StandardProcessSession[id=97534]): 
> java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
> org.apache.nifi.processor.exception.FlowFileHandlingException: 
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=1458158883054-93724, 
> container=cont2, section=540], offset=756882, 
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in 
> this session (StandardProcessSession[id=97534])
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to