alopresto commented on a change in pull request #3984:
URL: https://github.com/apache/nifi/pull/3984#discussion_r433348987



##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeHashRecord.java
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import com.sangupta.murmur.Murmur2;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.ProcessSession;
+
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Comparator;
+
+
+@SideEffectFree
+@Tags({"route", "distribute", "weighted", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Processor that distribute data over user specified 
relationships by distribution key/keys." +
+        " Data is distributed across relationships in the amount proportional 
to the relationship weight. For example, if there are " +
+        "two relationships and the first has a weight of 9 while the second 
has a weight of 10, the first will be sent 9 / 19 parts" +
+        " of the rows, and the second will be sent 10 / 19. " +
+        "To select the relationship that a row of data is sent to, specified 
keys extracted from record as string," +
+        " join with `-` delimiter and hash evaluate from this string, its 
remainder is taken from dividing it " +
+        "by the total weight of the relationships. If there is specified 
single integer key, hash value will not be calculated " +
+        "and processor just take remainder of division by the sum of the 
relationship weights from this value." +
+        " The row is sent to the relationship" +
+        " that corresponds to the half-interval of the remainders from 
'prev_weight' to 'prev_weights + weight', where" +
+        " 'prev_weights' is the total weight of the relationships with the 
smallest number, and 'weight' is the weight of this relationship." +
+        " For example, if there are two relationships, and the first has a 
weight of 9 while the second has a weight of 10," +
+        " the row will be sent to the first relationship for the remainders 
from the range [0, 9), and to the second for the remainders from the range [9, 
19).")
+@DynamicProperty(name = "The name of the relationship to route data to",
+        value = "Weight for this relationship.",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+        description = "Each user-defined property specifies a relationship and 
weight for this.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records selected by the query")
+})
+public class DistributeHashRecord extends AbstractProcessor {
+
+    public static final String MURMURHASH_32 = "murmurhash_32";
+
+    public static PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("reader")
+            .displayName("Record Reader")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .description("Specifies the Controller Service to use for parsing 
incoming data and determining the data's schema")
+            .build();
+
+    public static PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("writer")
+            .displayName("Record Writer")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .description("Specifies the Controller Service to use for writing 
out the records")
+            .build();
+
+    private static final String KEY_DELIMITER = ",";
+
+    public static PropertyDescriptor KEYS = new PropertyDescriptor.Builder()
+            .name("keys")
+            .displayName("Keys")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .description("Field names in a record separated by commas." +
+                    " If record has one key and this key is integer or long 
then hash function " +
+                    "will not be evaluated and processor will distribute 
record by this numerical value. " +
+                    "If this record has several keys for distribution or one 
key with not 'int' or 'long' type then processor will " +
+                    "obtain keys from record, trim them and join with `-` 
delimiter like <firstKey>-<secondKey>-<...> " +
+                    "then evaluate hash function which return numerical value 
for distribution")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static PropertyDescriptor HASH_FUNCTION = new 
PropertyDescriptor.Builder()
+            .name("hash function")
+            .displayName("Hash Function")
+            .required(true)
+            .description("Hash algorithm for keys hashing")
+            .allowableValues(MURMURHASH_32)

Review comment:
       I'm curious what this hash function is, and why there is a configurable 
property for it if there is only one allowable value. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to