Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2518#discussion_r184727554
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 ---
    @@ -336,51 +348,86 @@ public void shutdown() {
             }
         }
     
    +    private static final byte[] EMPTY_VIS_STRING;
    +
    +    static {
    +        try {
    +            EMPTY_VIS_STRING = "".getBytes("UTF-8");
    +        } catch (UnsupportedEncodingException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    private List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) {
    +        List<Put> retVal = new ArrayList<>();
    +
    +        try {
    +            Put put = null;
    +
    +            for (final PutColumn column : columns) {
    +                if (put == null || (put.getCellVisibility() == null && 
column.getVisibility() != null) || ( put.getCellVisibility() != null
    +                        && 
!put.getCellVisibility().getExpression().equals(column.getVisibility())
    +                    )) {
    +                    put = new Put(rowKey);
    +
    +                    if (column.getVisibility() != null) {
    +                        put.setCellVisibility(new 
CellVisibility(column.getVisibility()));
    +                    }
    +                    retVal.add(put);
    +                }
    +
    +                if (column.getTimestamp() != null) {
    +                    put.addColumn(
    +                            column.getColumnFamily(),
    +                            column.getColumnQualifier(),
    +                            column.getTimestamp(),
    +                            column.getBuffer());
    +                } else {
    +                    put.addColumn(
    +                            column.getColumnFamily(),
    +                            column.getColumnQualifier(),
    +                            column.getBuffer());
    +                }
    +            }
    +        } catch (DeserializationException de) {
    +            getLogger().error("Error writing cell visibility statement.", 
de);
    +            throw new RuntimeException(de);
    +        }
    +
    +        return retVal;
    +    }
    +
         @Override
         public void put(final String tableName, final Collection<PutFlowFile> 
puts) throws IOException {
             try (final Table table = 
connection.getTable(TableName.valueOf(tableName))) {
                 // Create one Put per row....
                 final Map<String, Put> rowPuts = new HashMap<>();
    +            final Map<String, List<PutColumn>> sorted = new HashMap<>();
    +            final List<Put> newPuts = new ArrayList<>();
    +
                 for (final PutFlowFile putFlowFile : puts) {
    -                //this is used for the map key as a byte[] does not work 
as a key.
                     final String rowKeyString = new 
String(putFlowFile.getRow(), StandardCharsets.UTF_8);
    -                Put put = rowPuts.get(rowKeyString);
    -                if (put == null) {
    -                    put = new Put(putFlowFile.getRow());
    -                    rowPuts.put(rowKeyString, put);
    +                List<PutColumn> columns = sorted.get(rowKeyString);
    +                if (columns == null) {
    +                    columns = new ArrayList<>();
    +                    sorted.put(rowKeyString, columns);
                     }
     
    -                for (final PutColumn column : putFlowFile.getColumns()) {
    -                    if (column.getTimestamp() != null) {
    -                        put.addColumn(
    -                                column.getColumnFamily(),
    -                                column.getColumnQualifier(),
    -                                column.getTimestamp(),
    -                                column.getBuffer());
    -                    } else {
    -                        put.addColumn(
    -                                column.getColumnFamily(),
    -                                column.getColumnQualifier(),
    -                                column.getBuffer());
    -                    }
    -                }
    +                columns.addAll(putFlowFile.getColumns());
    +            }
    +
    +            for (final Map.Entry<String, List<PutColumn>> entry : 
sorted.entrySet()) {
    +                
newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), 
entry.getValue()));
                 }
     
    -            table.put(new ArrayList<>(rowPuts.values()));
    +            table.put(new ArrayList<>(newPuts)); /*rowPuts.values()));*/
    --- End diff --
    
    Done. Not even sure why I did that...


---

Reply via email to