ijokarumawak commented on a change in pull request #3462: NIFI-6243 Add Support 
for AtomicDistributedCache to the HBase 1.x and…
URL: https://github.com/apache/nifi/pull/3462#discussion_r282358979
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java
 ##########
 @@ -229,20 +230,53 @@ public void close() throws IOException {
     protected void finalize() throws Throwable {
     }
 
+    @Override
+    public <K, V> AtomicCacheEntry<K, V, Long> fetch(K key, Serializer<K> 
keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+        final byte[] rowIdBytes = serialize(key, keySerializer);
+        final HBaseRowHandler handler = new HBaseRowHandler();
+
+        final List<Column> columnsList = new ArrayList<>(1);
+        columnsList.add(new Column(hBaseColumnFamilyBytes, 
hBaseColumnQualifierBytes));
+
+        hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, 
columnsList, authorizations, handler);
+
+        if (handler.numRows() > 1) {
+            throw new IOException("Found multiple rows in HBase for key");
+        } else if (handler.numRows() == 1) {
+            return new AtomicCacheEntry<>(key, 
deserialize(handler.getLastResultBytes(), valueDeserializer), 
handler.getLastResultTimestamp());
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public <K, V> boolean replace(AtomicCacheEntry<K, V, Long> entry, 
Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+        final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer);
+        final byte[] valueBytes = serialize(entry.getValue(), valueSerializer);
+        final long revision = entry.getRevision().orElse(0L);
+        final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, 
hBaseColumnQualifierBytes, valueBytes);
+
+        // If the current revision is unset then only insert the row if it 
doesn't already exist.
+        return hBaseClientService.checkAndPut(hBaseCacheTableName, rowIdBytes, 
hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes, revision, 
putColumn);
 
 Review comment:
   The `valueBytes` passed here should represents the original stored value. It 
looks `valueBytes` and the value within `putColumn` are the same now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to