[
https://issues.apache.org/jira/browse/NIFI-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074770#comment-16074770
]
ASF GitHub Bot commented on NIFI-4024:
--------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1961#discussion_r125645738
--- Diff:
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
---
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hadoop", "hbase", "put", "record"})
+@CapabilityDescription("Adds rows to HBase based on the contents of a
flowfile using a configured record reader.")
+public class PutHBaseRecord extends AbstractPutHBase {
+
+ protected static final PropertyDescriptor ROW_FIELD_NAME = new
PropertyDescriptor.Builder()
+ .name("Row Identifier Field Name")
+ .description("Specifies the name of a JSON element whose value
should be used as the row id for the given JSON document.")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final String FAIL_VALUE = "Fail";
+ protected static final String WARN_VALUE = "Warn";
+ protected static final String IGNORE_VALUE = "Ignore";
+ protected static final String TEXT_VALUE = "Text";
+
+ protected static final AllowableValue COMPLEX_FIELD_FAIL = new
AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any
elements contain complex values.");
+ protected static final AllowableValue COMPLEX_FIELD_WARN = new
AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include
field in row sent to HBase.");
+ protected static final AllowableValue COMPLEX_FIELD_IGNORE = new
AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include
in row sent to HBase.");
+ protected static final AllowableValue COMPLEX_FIELD_TEXT = new
AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the
complex field as the value of the given column.");
+
+ static final PropertyDescriptor RECORD_READER_FACTORY = new
PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for
parsing incoming data and determining the data's schema")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Complex Field Strategy")
+ .description("Indicates how to handle complex fields, i.e.
fields that do not have a single text value.")
+ .expressionLanguageSupported(false)
+ .required(true)
+ .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN,
COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
+ .defaultValue(COMPLEX_FIELD_TEXT.getValue())
+ .build();
+
+
+ protected static final AllowableValue FIELD_ENCODING_STRING = new
AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
+ "Stores the value of each field as a UTF-8 String.");
+ protected static final AllowableValue FIELD_ENCODING_BYTES = new
AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
+ "Stores the value of each field as the byte representation of
the type derived from the JSON.");
+
+ protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY =
new PropertyDescriptor.Builder()
+ .name("Field Encoding Strategy")
+ .description(("Indicates how to store the value of each field
in HBase. The default behavior is to convert each value from the " +
+ "JSON to a String, and store the UTF-8 bytes. Choosing
Bytes will interpret the type of each field from " +
+ "the JSON, and convert the value to the byte
representation of that type, meaning an integer will be stored as the " +
+ "byte representation of that integer."))
+ .required(true)
+ .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
+ .defaultValue(FIELD_ENCODING_STRING.getValue())
+ .build();
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(RECORD_READER_FACTORY);
+ properties.add(HBASE_CLIENT_SERVICE);
+ properties.add(TABLE_NAME);
+ properties.add(ROW_ID);
+ properties.add(ROW_FIELD_NAME);
+ properties.add(ROW_ID_ENCODING_STRATEGY);
+ properties.add(COLUMN_FAMILY);
+ properties.add(BATCH_SIZE);
+ properties.add(COMPLEX_FIELD_STRATEGY);
+ properties.add(FIELD_ENCODING_STRATEGY);
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_FAILURE);
+ return rels;
+ }
+
+
+ private RecordReaderFactory recordParserFactory;
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
--- End diff --
See later comment about possibly changing the meaning of batch size, if we
did that we would only grab one flow file here.
> Create EvaluateRecordPath processor
> -----------------------------------
>
> Key: NIFI-4024
> URL: https://issues.apache.org/jira/browse/NIFI-4024
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Steve Champagne
> Priority: Minor
>
> With the new RecordPath DSL, it would be nice if there was a processor that
> could pull fields into attributes of the flowfile based on a RecordPath. This
> would be similar to the EvaluateJsonPath processor that currently exists,
> except it could be used to pull fields from arbitrary record formats. My
> current use case for it would be pulling fields out of Avro records while
> skipping the steps of having to convert Avro to JSON, evaluate JsonPath, and
> then converting back to Avro.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)