[
https://issues.apache.org/jira/browse/NIFI-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16107695#comment-16107695
]
ASF GitHub Bot commented on NIFI-4024:
--------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1961#discussion_r130415687
--- Diff:
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hbase.put.PutColumn;
+import org.apache.nifi.hbase.put.PutFlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hadoop", "hbase", "put", "record"})
+@CapabilityDescription("Adds rows to HBase based on the contents of a
flowfile using a configured record reader.")
+@ReadsAttribute(attribute = "restart.index", description = "Reads
restart.index when it needs to replay part of a record set that did not get
into HBase.")
+@WritesAttribute(attribute = "restart.index", description = "Writes
restart.index when a batch fails to be insert into HBase")
+public class PutHBaseRecord extends AbstractPutHBase {
+
+ protected static final PropertyDescriptor ROW_FIELD_NAME = new
PropertyDescriptor.Builder()
+ .name("Row Identifier Field Path")
+ .description("Specifies the name of a record field whose value
should be used as the row id for the given record.")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final String FAIL_VALUE = "Fail";
+ protected static final String WARN_VALUE = "Warn";
+ protected static final String IGNORE_VALUE = "Ignore";
+ protected static final String TEXT_VALUE = "Text";
+
+ protected static final AllowableValue COMPLEX_FIELD_FAIL = new
AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any
elements contain complex values.");
+ protected static final AllowableValue COMPLEX_FIELD_WARN = new
AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include
field in row sent to HBase.");
+ protected static final AllowableValue COMPLEX_FIELD_IGNORE = new
AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include
in row sent to HBase.");
+ protected static final AllowableValue COMPLEX_FIELD_TEXT = new
AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the
complex field as the value of the given column.");
+
+ static final PropertyDescriptor RECORD_READER_FACTORY = new
PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for
parsing incoming data and determining the data's schema")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Complex Field Strategy")
+ .description("Indicates how to handle complex fields, i.e.
fields that do not have a single text value.")
+ .expressionLanguageSupported(false)
+ .required(true)
+ .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN,
COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT)
+ .defaultValue(COMPLEX_FIELD_TEXT.getValue())
+ .build();
+
+
+ protected static final AllowableValue FIELD_ENCODING_STRING = new
AllowableValue(STRING_ENCODING_VALUE, STRING_ENCODING_VALUE,
+ "Stores the value of each field as a UTF-8 String.");
+ protected static final AllowableValue FIELD_ENCODING_BYTES = new
AllowableValue(BYTES_ENCODING_VALUE, BYTES_ENCODING_VALUE,
+ "Stores the value of each field as the byte representation of
the type derived from the record.");
+
+ protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY =
new PropertyDescriptor.Builder()
+ .name("Field Encoding Strategy")
+ .description(("Indicates how to store the value of each field
in HBase. The default behavior is to convert each value from the " +
+ "record to a String, and store the UTF-8 bytes.
Choosing Bytes will interpret the type of each field from " +
+ "the record, and convert the value to the byte
representation of that type, meaning an integer will be stored as the " +
+ "byte representation of that integer."))
+ .required(true)
+ .allowableValues(FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES)
+ .defaultValue(FIELD_ENCODING_STRING.getValue())
+ .build();
+
+ protected static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("The maximum number of records to be sent to
HBase at any one time from the record set.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("1000")
+ .build();
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(RECORD_READER_FACTORY);
+ properties.add(HBASE_CLIENT_SERVICE);
+ properties.add(TABLE_NAME);
+ properties.add(ROW_FIELD_NAME);
+ properties.add(ROW_ID_ENCODING_STRATEGY);
+ properties.add(COLUMN_FAMILY);
+ properties.add(BATCH_SIZE);
+ properties.add(COMPLEX_FIELD_STRATEGY);
+ properties.add(FIELD_ENCODING_STRATEGY);
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_SUCCESS);
+ rels.add(REL_FAILURE);
+ return rels;
+ }
+
+ private int addBatch(String tableName, List<PutFlowFile> flowFiles)
throws IOException {
+ int columns = 0;
+ clientService.put(tableName, flowFiles);
+ for (PutFlowFile put : flowFiles) {
+ columns += put.getColumns().size();
+ }
+
+ return columns;
+ }
+
+ private RecordReaderFactory recordParserFactory;
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final RecordReaderFactory recordParserFactory =
context.getProperty(RECORD_READER_FACTORY)
+ .asControllerService(RecordReaderFactory.class);
+ List<PutFlowFile> flowFiles = new ArrayList<>();
+ final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String rowFieldName =
context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String columnFamily =
context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
+ final String fieldEncodingStrategy =
context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
+ final String complexFieldStrategy =
context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
+ final String rowEncodingStrategy =
context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
+
+ final long start = System.nanoTime();
+ int index = 0;
+ int columns = 0;
+ boolean failed = false;
+ String startIndexStr = flowFile.getAttribute("restart.index");
+ int startIndex = -1;
+ if (startIndexStr != null) {
+ startIndex = Integer.parseInt(startIndexStr);
+ }
+
+ PutFlowFile last = null;
+ try (RecordReader reader =
recordParserFactory.createRecordReader(flowFile, session.read(flowFile),
getLogger())) {
+ Record record;
+ if (startIndex >= 0) {
+ while ( index++ < startIndex && (reader.nextRecord()) !=
null) {}
+ }
+
+ while ((record = reader.nextRecord()) != null) {
+ PutFlowFile putFlowFile = createPut(context, record,
reader.getSchema(), flowFile, rowFieldName, columnFamily,
fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
+ flowFiles.add(putFlowFile);
+ index++;
+
+ if (flowFiles.size() == batchSize) {
+ columns += addBatch(tableName, flowFiles);
+ last = flowFiles.get(flowFiles.size() - 1);
+ flowFiles = new ArrayList<>();
+ }
+ }
+ if (flowFiles.size() > 0) {
+ columns += addBatch(tableName, flowFiles);
+ last = flowFiles.get(flowFiles.size() - 1);
+ }
+ } catch (Exception ex) {
+ getLogger().error("Failed to put records to HBase.", ex);
+ failed = true;
+ }
+
+ if (!failed) {
+ sendProvenance(session, flowFile, columns, System.nanoTime() -
start, last);
+ flowFile = session.removeAttribute(flowFile, "restart.index");
+ session.transfer(flowFile, REL_SUCCESS);
+ } else {
+ String restartIndex = Integer.toString(index -
flowFiles.size());
+ flowFile = session.putAttribute(flowFile, "restart.index",
restartIndex);
+ sendProvenance(session, flowFile, columns, System.nanoTime() -
start, last);
--- End diff --
Should we wrap this in a conditional like "if columns > 0" then send
provenance?
This would stop us from reporting a provenance event for cases where we
didn't send anything successfully.
> Create EvaluateRecordPath processor
> -----------------------------------
>
> Key: NIFI-4024
> URL: https://issues.apache.org/jira/browse/NIFI-4024
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Steve Champagne
> Priority: Minor
>
> With the new RecordPath DSL, it would be nice if there was a processor that
> could pull fields into attributes of the flowfile based on a RecordPath. This
> would be similar to the EvaluateJsonPath processor that currently exists,
> except it could be used to pull fields from arbitrary record formats. My
> current use case for it would be pulling fields out of Avro records while
> skipping the steps of having to convert Avro to JSON, evaluate JsonPath, and
> then converting back to Avro.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)