Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2518#discussion_r184727554 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java --- @@ -336,51 +348,86 @@ public void shutdown() { } } + private static final byte[] EMPTY_VIS_STRING; + + static { + try { + EMPTY_VIS_STRING = "".getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + private List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) { + List<Put> retVal = new ArrayList<>(); + + try { + Put put = null; + + for (final PutColumn column : columns) { + if (put == null || (put.getCellVisibility() == null && column.getVisibility() != null) || ( put.getCellVisibility() != null + && !put.getCellVisibility().getExpression().equals(column.getVisibility()) + )) { + put = new Put(rowKey); + + if (column.getVisibility() != null) { + put.setCellVisibility(new CellVisibility(column.getVisibility())); + } + retVal.add(put); + } + + if (column.getTimestamp() != null) { + put.addColumn( + column.getColumnFamily(), + column.getColumnQualifier(), + column.getTimestamp(), + column.getBuffer()); + } else { + put.addColumn( + column.getColumnFamily(), + column.getColumnQualifier(), + column.getBuffer()); + } + } + } catch (DeserializationException de) { + getLogger().error("Error writing cell visibility statement.", de); + throw new RuntimeException(de); + } + + return retVal; + } + @Override public void put(final String tableName, final Collection<PutFlowFile> puts) throws IOException { try (final Table table = connection.getTable(TableName.valueOf(tableName))) { // Create one Put per row.... final Map<String, Put> rowPuts = new HashMap<>(); + final Map<String, List<PutColumn>> sorted = new HashMap<>(); + final List<Put> newPuts = new ArrayList<>(); + for (final PutFlowFile putFlowFile : puts) { - //this is used for the map key as a byte[] does not work as a key. final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8); - Put put = rowPuts.get(rowKeyString); - if (put == null) { - put = new Put(putFlowFile.getRow()); - rowPuts.put(rowKeyString, put); + List<PutColumn> columns = sorted.get(rowKeyString); + if (columns == null) { + columns = new ArrayList<>(); + sorted.put(rowKeyString, columns); } - for (final PutColumn column : putFlowFile.getColumns()) { - if (column.getTimestamp() != null) { - put.addColumn( - column.getColumnFamily(), - column.getColumnQualifier(), - column.getTimestamp(), - column.getBuffer()); - } else { - put.addColumn( - column.getColumnFamily(), - column.getColumnQualifier(), - column.getBuffer()); - } - } + columns.addAll(putFlowFile.getColumns()); + } + + for (final Map.Entry<String, List<PutColumn>> entry : sorted.entrySet()) { + newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue())); } - table.put(new ArrayList<>(rowPuts.values())); + table.put(new ArrayList<>(newPuts)); /*rowPuts.values()));*/ --- End diff -- Done. Not even sure why I did that...
---