tpalfy commented on a change in pull request #3611: NIFI-6009 ScanKudu Processor
URL: https://github.com/apache/nifi/pull/3611#discussion_r326717059
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/ScanKudu.java
 ##########
 @@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.RowResult;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processors.kudu.io.ResultHandler;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosUser;
+
+import javax.security.auth.login.LoginException;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"kudu", "scan", "fetch", "get"})
+@CapabilityDescription("Scans rows from a Kudu table with an optional list of 
predicates")
+@WritesAttributes({
+        @WritesAttribute(attribute = "kudu.table", description = "The name of 
the Kudu table that the row was fetched from"),
+        @WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise"),
+        @WritesAttribute(attribute = "kudu.rows.count", description = "Number 
of rows in the content of given flow file"),
+        @WritesAttribute(attribute = "scankudu.results.found", description = 
"Indicates whether at least one row has been found in given Kudu table with 
provided predicates. "
+                + "Could be null (not present) if transfered to FAILURE")})
+public class ScanKudu extends AbstractKuduProcessor {
+
+    static final Pattern PREDICATES_PATTERN = 
Pattern.compile("\\w+((<=|>=|[=<>])(\\w|-)+)?(?:,\\w+((<=|>=|[=<>])(\\w|-)+)?)*");
+    static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+((\\w)+)?(?:,\\w+((\\w)+)?)*");
+
+    protected static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("table-name")
+            .displayName("Table Name")
+            .description("The name of the Kudu Table to put data into")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    static final PropertyDescriptor PREDICATES = new 
PropertyDescriptor.Builder()
+            .name("Predicates")
+            .description("A comma-separated list of Predicates, format: 
\"<colName>:<value>\" ")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            
.addValidator(StandardValidators.createRegexMatchingValidator(PREDICATES_PATTERN))
+            .build();
+
+    static final PropertyDescriptor PROJECTED_COLUMNS = new 
PropertyDescriptor.Builder()
+            .name("Projected Column Names")
+            .description("A comma-separated list of \"<column>\" names to 
return when scanning, default all.")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            
.addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN))
+            .build();
+
+    protected static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("The maximum number of FlowFiles to process in a 
single execution, between 1 - 100000. " +
+                    "Depending on your memory size, and data size per row set 
an appropriate batch size. " +
+                    "Gradually increase this number to find out the best one 
for best performances.")
+            .defaultValue("500")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(1, 100000, 
true))
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All FlowFiles are routed to this relationship")
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("The original input file will be routed to this 
destination, even if no rows are retrieved based on provided conditions.")
+            .build();
+
+    protected static final Relationship REL_FAILURE = new 
Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if it is 
not able to read from Kudu")
+            .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(KUDU_MASTERS);
+        properties.add(KERBEROS_CREDENTIALS_SERVICE);
+        properties.add(KUDU_OPERATION_TIMEOUT_MS);
+        properties.add(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS);
+        properties.add(TABLE_NAME);
+        properties.add(PREDICATES);
+        properties.add(PROJECTED_COLUMNS);
+        properties.add(BATCH_SIZE);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    static final Set<Relationship> relationships;
+    static {
+        Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_SUCCESS);
+        rels.add(REL_ORIGINAL);
+        rels.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(rels);
+    }
+
+    static final String KUDU_TABLE_ATTR = "kudu.table";
+
+    static final String KUDU_ROWS_COUNT_ATTR = "kudu.rows.count";
+
+    protected int batchSize = 500;
+
+    protected KuduTable kuduTable;
+
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) throws 
LoginException {
+        createKuduClient(context);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final KerberosUser user = getKerberosUser();
 
 Review comment:
   This whole logic probably could go into `AbstractKuduProcessor`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to