Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2518#discussion_r184269182
--- Diff:
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
---
@@ -336,51 +348,86 @@ public void shutdown() {
}
}
+ private static final byte[] EMPTY_VIS_STRING;
+
+ static {
+ try {
+ EMPTY_VIS_STRING = "".getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) {
+ List<Put> retVal = new ArrayList<>();
+
+ try {
+ Put put = null;
+
+ for (final PutColumn column : columns) {
+ if (put == null || (put.getCellVisibility() == null &&
column.getVisibility() != null) || ( put.getCellVisibility() != null
+ &&
!put.getCellVisibility().getExpression().equals(column.getVisibility())
+ )) {
+ put = new Put(rowKey);
+
+ if (column.getVisibility() != null) {
+ put.setCellVisibility(new
CellVisibility(column.getVisibility()));
+ }
+ retVal.add(put);
+ }
+
+ if (column.getTimestamp() != null) {
+ put.addColumn(
+ column.getColumnFamily(),
+ column.getColumnQualifier(),
+ column.getTimestamp(),
+ column.getBuffer());
+ } else {
+ put.addColumn(
+ column.getColumnFamily(),
+ column.getColumnQualifier(),
+ column.getBuffer());
+ }
+ }
+ } catch (DeserializationException de) {
+ getLogger().error("Error writing cell visibility statement.",
de);
+ throw new RuntimeException(de);
+ }
+
+ return retVal;
+ }
+
@Override
public void put(final String tableName, final Collection<PutFlowFile>
puts) throws IOException {
try (final Table table =
connection.getTable(TableName.valueOf(tableName))) {
// Create one Put per row....
final Map<String, Put> rowPuts = new HashMap<>();
+ final Map<String, List<PutColumn>> sorted = new HashMap<>();
+ final List<Put> newPuts = new ArrayList<>();
+
for (final PutFlowFile putFlowFile : puts) {
- //this is used for the map key as a byte[] does not work
as a key.
final String rowKeyString = new
String(putFlowFile.getRow(), StandardCharsets.UTF_8);
- Put put = rowPuts.get(rowKeyString);
- if (put == null) {
- put = new Put(putFlowFile.getRow());
- rowPuts.put(rowKeyString, put);
+ List<PutColumn> columns = sorted.get(rowKeyString);
+ if (columns == null) {
+ columns = new ArrayList<>();
+ sorted.put(rowKeyString, columns);
}
- for (final PutColumn column : putFlowFile.getColumns()) {
- if (column.getTimestamp() != null) {
- put.addColumn(
- column.getColumnFamily(),
- column.getColumnQualifier(),
- column.getTimestamp(),
- column.getBuffer());
- } else {
- put.addColumn(
- column.getColumnFamily(),
- column.getColumnQualifier(),
- column.getBuffer());
- }
- }
+ columns.addAll(putFlowFile.getColumns());
+ }
+
+ for (final Map.Entry<String, List<PutColumn>> entry :
sorted.entrySet()) {
+
newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8),
entry.getValue()));
}
- table.put(new ArrayList<>(rowPuts.values()));
+ table.put(new ArrayList<>(newPuts)); /*rowPuts.values()));*/
--- End diff --
Do we have to wrap it with new ArrayList here? If not, lets just pass
`newPuts` to avoid unnecessary object creation.
---