awelless commented on code in PR #10633:
URL: https://github.com/apache/nifi/pull/10633#discussion_r2611289740


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java:
##########
@@ -152,28 +152,27 @@ public void checkpointEndedShard(final ShardBufferId 
bufferId, final RecordProce
 
     @Override
     public void shutdownShardConsumption(final ShardBufferId bufferId, final 
RecordProcessorCheckpointer checkpointer) {
-        final ShardBuffer buffer = shardBuffers.get(bufferId);
+        final ShardBuffer buffer = shardBuffers.remove(bufferId);
+
         if (buffer == null) {
             logger.debug("Buffer with id {} not found. Cannot shutdown shard 
consumption", bufferId);
-            return;
+        } else {
+            logger.debug("Shutting down the buffer {}. Checkpointing last 
consumed record", bufferId);
+            final Collection<RecordBatch> invalidatedBatches = 
buffer.shutdownBuffer(checkpointer);
+            memoryTracker.freeMemory(invalidatedBatches, bufferId);

Review Comment:
   This absent line was causing the memory tracker to think the buffer is full, 
even when no data was there.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java:
##########
@@ -352,8 +354,8 @@ void reserveMemory(final RecordBatch recordBatch) {
                 } else {
                     final long newConsumedBytes = currentlyConsumedBytes + 
consumedBytes;
                     if 
(consumedMemoryBytes.compareAndSet(currentlyConsumedBytes, newConsumedBytes)) {
-                        logger.debug("Reserved {} bytes for {} records. Total 
consumed memory: {} bytes",
-                                consumedBytes, recordBatch.size(), 
newConsumedBytes);
+                        logger.debug("Reserved {} bytes for {} records for 
buffer {}. Total consumed memory: {} bytes",
+                                consumedBytes, recordBatch.size(), bufferId, 
newConsumedBytes);

Review Comment:
   Logging `bufferId` in `reserveMemory` and `freeMemory` allows us to 
precisely track memory operations for each buffer in the logs.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java:
##########
@@ -585,35 +588,46 @@ void checkpointEndedShard(final 
RecordProcessorCheckpointer checkpointer) {
             }
         }
 
-        void shutdownBuffer(final RecordProcessorCheckpointer checkpointer) {
-            if (invalidated.get()) {
-                return;
+        Collection<RecordBatch> shutdownBuffer(final 
RecordProcessorCheckpointer checkpointer) {
+            if (invalidated.getAndSet(true)) {
+                return emptyList();
             }
 
             if (batchesCount.get() == 0) {
                 checkpointLastReceivedRecord(checkpointer);
-            } else {
-                // If there are still records in the buffer, checkpointing 
with the latest provided checkpointer is not safe.
-                // But, if the records were committed without checkpointing in 
the past, we can checkpoint them now.
-                final LastIgnoredCheckpoint ignoredCheckpoint = 
this.lastIgnoredCheckpoint;
-                if (ignoredCheckpoint != null) {
-                    checkpointSequenceNumber(
-                            ignoredCheckpoint.checkpointer(),
-                            ignoredCheckpoint.sequenceNumber(),
-                            ignoredCheckpoint.subSequenceNumber()
-                    );
-                }
+                return emptyList();
+            }
+
+            // If there are still records in the buffer, checkpointing with 
the latest provided checkpointer is not safe.
+            // But, if the records were committed without checkpointing in the 
past, we can checkpoint them now.
+            final LastIgnoredCheckpoint ignoredCheckpoint = 
this.lastIgnoredCheckpoint;
+            if (ignoredCheckpoint != null) {
+                checkpointSequenceNumber(
+                        ignoredCheckpoint.checkpointer(),
+                        ignoredCheckpoint.sequenceNumber(),
+                        ignoredCheckpoint.subSequenceNumber()
+                );
             }
+
+            return drainInvalidatedBatches();
         }
 
         Collection<RecordBatch> invalidate() {
             if (invalidated.getAndSet(true)) {
                 return emptyList();
             }
 
+            return drainInvalidatedBatches();
+        }
+
+        private Collection<RecordBatch> drainInvalidatedBatches() {
+            if (!invalidated.get()) {
+                throw new IllegalStateException("Can't drain invalidated 
batches for valid shard buffer: " + bufferId);
+            }

Review Comment:
   Just a safety check to make sure we perform this destructive action for 
invalidated buffers only.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java:
##########
@@ -683,6 +697,9 @@ private void checkpointSafely(final CheckpointAction 
checkpointAction) {
                 } catch (final ShutdownException e) {
                     logger.warn("Failed to checkpoint records due to shutdown. 
Ignoring checkpoint", e);
                     return;
+                } catch (final RuntimeException e) {

Review Comment:
   It's essential to ensure no exception is thrown from the buffer methods, so 
that `freeMemory` is always called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to