[ https://issues.apache.org/jira/browse/NIFI-4637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16456684#comment-16456684 ]
ASF GitHub Bot commented on NIFI-4637: -------------------------------------- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2518#discussion_r184734660 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseCells.java --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Scanner; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = "error.line", description = "The line number of the error."), + @WritesAttribute(attribute = "error.msg", description = "The message explaining the error.") +}) +@Tags({"hbase", "delete", "cell", "cells", "visibility"}) +@CapabilityDescription("This processor allows the user to delete individual HBase cells by specifying one or more lines " + + "in the flowfile content that are a sequence composed of row ID, column family, column qualifier and associated visibility labels " + + "if visibility labels are enabled and in use. A user-defined separator is used to separate each of these pieces of data on each " + + "line, with :::: being the default separator.") +public class DeleteHBaseCells extends AbstractDeleteHBase { + static final PropertyDescriptor SEPARATOR = new PropertyDescriptor.Builder() + .name("delete-hbase-cell-separator") + .displayName("Separator") + .description("Each line of the flowfile content is separated into components for building a delete using this" + + "separator. It should be something other than a single colon or a comma because these are values that " + + "are associated with columns and visibility labels respectively. To delete a row with ID xyz, column family abc, " + + "column qualifier def and visibility label PII&PHI, one would specify xyz::::abc::::def::::PII&PHI given the default " + + "value") + .required(true) + .defaultValue("::::") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final String ERROR_LINE = "error.line"; + static final String ERROR_MSG = "error.msg"; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(SEPARATOR); + + return properties; + } + + private FlowFile writeErrorAttributes(int line, String msg, FlowFile file, ProcessSession session) { + file = session.putAttribute(file, ERROR_LINE, String.valueOf(line)); + file = session.putAttribute(file, ERROR_MSG, msg != null ? msg : ""); + return file; + } + + private void logCell(String rowId, String family, String column, String visibility) { + StringBuilder sb = new StringBuilder() + .append("Assembling cell delete for...\t") + .append(String.format("Row ID: %s\t", rowId)) + .append(String.format("Column Family: %s\t", family)) + .append(String.format("Column Qualifier: %s\t", column)) + .append(String.format("Visibility Label: %s", visibility)); + getLogger().debug(sb.toString()); + } + + @Override + protected void doDelete(ProcessContext context, ProcessSession session) throws Exception { + FlowFile input = session.get(); + if (input == null) { + return; + } + + final String separator = context.getProperty(SEPARATOR).evaluateAttributeExpressions(input).getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(input).getValue(); + try (InputStream is = session.read(input)) { + Scanner scanner = new Scanner(is); + int lineNum = 1; + List<DeleteRequest> deletes = new ArrayList<>(); + while (scanner.hasNextLine()) { + String line = scanner.nextLine().trim(); + if (line.equals("")) { + continue; + } + String[] parts = line.split(separator); + if (parts.length < 3 || parts.length > 4) { + final String msg = String.format("Invalid line length. It must have 3 or 4 components. It had %d.", parts.length); + input = writeErrorAttributes(lineNum, msg, input, session); + session.transfer(input, REL_FAILURE); + getLogger().error(msg); + return; + } + String rowId = parts[0]; + String family = parts[1]; + String column = parts[2]; + String visibility = parts.length == 4 ? parts[3] : null; + + DeleteRequest request = new DeleteRequest(rowId.getBytes(), family.getBytes(), column.getBytes(), visibility); + deletes.add(request); + + if (getLogger().isDebugEnabled()) { + logCell(rowId, family, column, visibility); + } + + lineNum++; + } + is.close(); + clientService.deleteCells(tableName, deletes); + + session.transfer(input, REL_SUCCESS); + } catch (Exception ex) { + input = writeErrorAttributes(-1, ex.getMessage(), input, session); --- End diff -- Done. > Add support for HBase visibility labels to HBase processors and controller > services > ----------------------------------------------------------------------------------- > > Key: NIFI-4637 > URL: https://issues.apache.org/jira/browse/NIFI-4637 > Project: Apache NiFi > Issue Type: Improvement > Reporter: Mike Thomsen > Assignee: Mike Thomsen > Priority: Major > > HBase supports visibility labels, but you can't use them from NiFi because > there is no way to set them. The existing processors and services should be > upgraded to handle this capability. -- This message was sent by Atlassian JIRA (v7.6.3#76005)