[ 
https://issues.apache.org/jira/browse/NIFI-3538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292888#comment-16292888
 ] 

ASF GitHub Bot commented on NIFI-3538:
--------------------------------------

Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2294#discussion_r157258216
  
    --- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java
 ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.hbase;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +@Tags({ "delete", "hbase" })
    +@CapabilityDescription(
    +        "Delete HBase records individually or in batches. The input can be 
a single row ID in the body, one ID per line, " +
    +        "row IDs separated by commas or a combination of the two. ")
    +public class DeleteHBaseRow extends AbstractDeleteHBase {
    +    static final AllowableValue ROW_ID_BODY = new AllowableValue("body", 
"FlowFile content", "Get the row key(s) from the flowfile content.");
    +    static final AllowableValue ROW_ID_ATTR = new AllowableValue("attr", 
"FlowFile attributes", "Get the row key from an expression language 
statement.");
    +
    +    static final PropertyDescriptor ROW_ID_LOCATION = new 
PropertyDescriptor.Builder()
    +            .name("delete-hb-row-id-location")
    +            .displayName("Row ID Location")
    +            .description("The location of the row ID to use for building 
the delete. Can be from the content or an expression language statement.")
    +            .required(true)
    +            .defaultValue("body")
    +            .allowableValues(ROW_ID_BODY, ROW_ID_ATTR)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    static final PropertyDescriptor FLOWFILE_FETCH_COUNT = new 
PropertyDescriptor.Builder()
    +            .name("delete-hb-flowfile-fetch-count")
    +            .displayName("Flowfile Fetch Count")
    +            .description("The number of flowfiles to fetch per run.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5")
    +            .expressionLanguageSupported(false)
    +            .build();
    +
    +    static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("delete-hb-row-ff-count")
    +            .displayName("Batch Size")
    +            .description("The number of deletes to send per batch.")
    +            .required(true)
    +            .defaultValue("50")
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .expressionLanguageSupported(false)
    +            .build();
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = 
super.getSupportedPropertyDescriptors();
    +        properties.add(ROW_ID_LOCATION);
    +        properties.add(FLOWFILE_FETCH_COUNT);
    +        properties.add(BATCH_SIZE);
    +
    +        return properties;
    +    }
    +
    +    @Override
    +    protected void doDelete(ProcessContext context, ProcessSession 
session) throws Exception {
    +        final int batchSize     = 
context.getProperty(BATCH_SIZE).asInteger();
    +        final String location   = 
context.getProperty(ROW_ID_LOCATION).getValue();
    +        final int flowFileCount = 
context.getProperty(FLOWFILE_FETCH_COUNT).asInteger();
    +        List<FlowFile> flowFiles = session.get(flowFileCount);
    +
    +        if (flowFiles != null && flowFiles.size() > 0) {
    +            try {
    +                if (location.equals(ROW_ID_BODY.getValue())) {
    +                    doDeleteFromBody(flowFiles, context, session, 
batchSize);
    +                } else {
    +                    doDeleteFromAttributes(flowFiles, context, session, 
batchSize);
    +                }
    +
    +                for (int index = 0; index < flowFiles.size(); index++) {
    +                    session.transfer(flowFiles.get(index), REL_SUCCESS);
    +                }
    +            } catch (Exception ex) {
    +                getLogger().error("");
    +                for (int index = 0; index < flowFiles.size(); index++) {
    +                    session.transfer(flowFiles.get(index), REL_FAILURE);
    +                }
    +            }
    +        }
    +    }
    +
    +    private void doDeleteFromAttributes(List<FlowFile> flowFiles, 
ProcessContext context, ProcessSession session, int batchSize) throws Exception 
{
    +        Map<String, List<byte[]>> deletes = new HashMap<>();
    +        for (FlowFile flowFile : flowFiles) {
    +            String rowKey = 
context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
    +            String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +            if (rowKey == null || rowKey.trim().equals("")) {
    +                throw new RuntimeException("Could not find a row key with 
provided EL statement, stopping...");
    +            }
    +
    +            List<byte[]> batch = deletes.get(tableName);
    +            if (batch == null) {
    +                batch = new ArrayList<>();
    +                deletes.put(tableName, batch);
    +            }
    +
    +            batch.add(rowKey.getBytes("UTF-8"));
    +        }
    +
    +        processDeletes(deletes, batchSize);
    +    }
    +
    +    private void doDeleteFromBody(List<FlowFile> flowFiles, ProcessContext 
context, ProcessSession session, int batchSize) throws Exception {
    +        Map<String, List<byte[]>> deletes = new HashMap<>();
    +        for (FlowFile flowFile : flowFiles) {
    +            session.read(flowFile, in -> {
    +                String data = IOUtils.toString(in, "UTF-8");
    +                String[] lines = data.split("[\\r\\n]");
    +                final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +                for (String line : lines) {
    +                    String[] separated = line.split(",");
    +                    for (String part : separated) {
    +                        List<byte[]> batch = deletes.get(tableName);
    +                        if (batch == null) {
    +                            batch = new ArrayList<>();
    +                            deletes.put(tableName, batch);
    +                        }
    +                        batch.add(part.trim().getBytes("UTF-8"));
    +                    }
    +                }
    +            });
    +        }
    +
    +        processDeletes(deletes, batchSize);
    +    }
    +
    +    private void processDeletes(Map<String, List<byte[]>> deletes, int 
batchSize) throws IOException {
    +        for (Map.Entry<String, List<byte[]>> entry : deletes.entrySet()) {
    +            if (entry.getValue().size() <= batchSize) {
    +                clientService.delete(entry.getKey(), entry.getValue());
    +            } else {
    --- End diff --
    
    I think this is quite a dangerous behavior. This might lead to situation in 
which part of the flowfiles processed are succeeded and some not. Can't we just 
submit one batch per run? The user can control the batch size using 
FLOWFILE_FETCH_COUNT...


> Add DeleteHBase processor(s)
> ----------------------------
>
>                 Key: NIFI-3538
>                 URL: https://issues.apache.org/jira/browse/NIFI-3538
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Mike Thomsen
>
> NiFi currently has processors for storing and retrieving cells/rows in HBase, 
> but there is no mechanism for deleting records and/or tables.
> I'm not sure if a single DeleteHBase processor could accomplish both, that 
> can be discussed under this Jira (and can be split out if necessary).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to