[
https://issues.apache.org/jira/browse/NIFI-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078593#comment-16078593
]
ASF GitHub Bot commented on NIFI-4024:
--------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1961#discussion_r126219285
--- Diff:
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hadoop", "hbase", "put", "record"})
+@CapabilityDescription("Adds rows to HBase based on the contents of a
flowfile using a configured record reader.")
+public class PutHBaseRecord extends AbstractPutHBase {
+
+ protected static final PropertyDescriptor ROW_FIELD_NAME = new
PropertyDescriptor.Builder()
+ .name("Row Identifier Field Name")
+ .description("Specifies the name of a record field whose value
should be used as the row id for the given record.")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final String FAIL_VALUE = "Fail";
+ protected static final String WARN_VALUE = "Warn";
+ protected static final String IGNORE_VALUE = "Ignore";
+ protected static final String TEXT_VALUE = "Text";
+
+ protected static final AllowableValue COMPLEX_FIELD_FAIL = new
AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any
elements contain complex values.");
+ protected static final AllowableValue COMPLEX_FIELD_WARN = new
AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include
field in row sent to HBase.");
+ protected static final AllowableValue COMPLEX_FIELD_IGNORE = new
AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include
in row sent to HBase.");
+ protected static final AllowableValue COMPLEX_FIELD_TEXT = new
AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the
complex field as the value of the given column.");
+
+ static final PropertyDescriptor RECORD_READER_FACTORY = new
PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for
parsing incoming data and determining the data's schema")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Complex Field Strategy")
+ .description("Indicates how to handle complex fields, i.e.
fields that do not have a single text value.")
+ .expressionLanguageSupported(false)
+ .required(true)
+ .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN,
COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
+ .defaultValue(COMPLEX_FIELD_TEXT.getValue())
+ .build();
+
+
+ protected static final AllowableValue FIELD_ENCODING_STRING = new
AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
+ "Stores the value of each field as a UTF-8 String.");
+ protected static final AllowableValue FIELD_ENCODING_BYTES = new
AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
+ "Stores the value of each field as the byte representation of
the type derived from the record.");
+
+ protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY =
new PropertyDescriptor.Builder()
+ .name("Field Encoding Strategy")
+ .description(("Indicates how to store the value of each field
in HBase. The default behavior is to convert each value from the " +
+ "record to a String, and store the UTF-8 bytes.
Choosing Bytes will interpret the type of each field from " +
+ "the record, and convert the value to the byte
representation of that type, meaning an integer will be stored as the " +
+ "byte representation of that integer."))
+ .required(true)
+ .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
+ .defaultValue(FIELD_ENCODING_STRING.getValue())
+ .build();
+
+ protected static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("The maximum number of records to be sent to
HBase at any one time from the record set.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("1000")
+ .build();
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(RECORD_READER_FACTORY);
+ properties.add(HBASE_CLIENT_SERVICE);
+ properties.add(TABLE_NAME);
+ properties.add(ROW_ID);
--- End diff --
I think we can remove ROW_ID here since we are only using ROW_FIELD_NAME to
get the row id from a record.
> Create EvaluateRecordPath processor
> -----------------------------------
>
> Key: NIFI-4024
> URL: https://issues.apache.org/jira/browse/NIFI-4024
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Steve Champagne
> Priority: Minor
>
> With the new RecordPath DSL, it would be nice if there was a processor that
> could pull fields into attributes of the flowfile based on a RecordPath. This
> would be similar to the EvaluateJsonPath processor that currently exists,
> except it could be used to pull fields from arbitrary record formats. My
> current use case for it would be pulling fields out of Avro records while
> skipping the steps of having to convert Avro to JSON, evaluate JsonPath, and
> then converting back to Avro.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)