awelless commented on code in PR #10633:
URL: https://github.com/apache/nifi/pull/10633#discussion_r2611289740
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java:
##########
@@ -152,28 +152,27 @@ public void checkpointEndedShard(final ShardBufferId
bufferId, final RecordProce
@Override
public void shutdownShardConsumption(final ShardBufferId bufferId, final
RecordProcessorCheckpointer checkpointer) {
- final ShardBuffer buffer = shardBuffers.get(bufferId);
+ final ShardBuffer buffer = shardBuffers.remove(bufferId);
+
if (buffer == null) {
logger.debug("Buffer with id {} not found. Cannot shutdown shard
consumption", bufferId);
- return;
+ } else {
+ logger.debug("Shutting down the buffer {}. Checkpointing last
consumed record", bufferId);
+ final Collection<RecordBatch> invalidatedBatches =
buffer.shutdownBuffer(checkpointer);
+ memoryTracker.freeMemory(invalidatedBatches, bufferId);
Review Comment:
This absent line was causing the memory tracker to think the buffer is full,
even when no data was there.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java:
##########
@@ -352,8 +354,8 @@ void reserveMemory(final RecordBatch recordBatch) {
} else {
final long newConsumedBytes = currentlyConsumedBytes +
consumedBytes;
if
(consumedMemoryBytes.compareAndSet(currentlyConsumedBytes, newConsumedBytes)) {
- logger.debug("Reserved {} bytes for {} records. Total
consumed memory: {} bytes",
- consumedBytes, recordBatch.size(),
newConsumedBytes);
+ logger.debug("Reserved {} bytes for {} records for
buffer {}. Total consumed memory: {} bytes",
+ consumedBytes, recordBatch.size(), bufferId,
newConsumedBytes);
Review Comment:
Logging `bufferId` in `reserveMemory` and `freeMemory` allows us to
precisely track memory operations for each buffer in the logs.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java:
##########
@@ -585,35 +588,46 @@ void checkpointEndedShard(final
RecordProcessorCheckpointer checkpointer) {
}
}
- void shutdownBuffer(final RecordProcessorCheckpointer checkpointer) {
- if (invalidated.get()) {
- return;
+ Collection<RecordBatch> shutdownBuffer(final
RecordProcessorCheckpointer checkpointer) {
+ if (invalidated.getAndSet(true)) {
+ return emptyList();
}
if (batchesCount.get() == 0) {
checkpointLastReceivedRecord(checkpointer);
- } else {
- // If there are still records in the buffer, checkpointing
with the latest provided checkpointer is not safe.
- // But, if the records were committed without checkpointing in
the past, we can checkpoint them now.
- final LastIgnoredCheckpoint ignoredCheckpoint =
this.lastIgnoredCheckpoint;
- if (ignoredCheckpoint != null) {
- checkpointSequenceNumber(
- ignoredCheckpoint.checkpointer(),
- ignoredCheckpoint.sequenceNumber(),
- ignoredCheckpoint.subSequenceNumber()
- );
- }
+ return emptyList();
+ }
+
+ // If there are still records in the buffer, checkpointing with
the latest provided checkpointer is not safe.
+ // But, if the records were committed without checkpointing in the
past, we can checkpoint them now.
+ final LastIgnoredCheckpoint ignoredCheckpoint =
this.lastIgnoredCheckpoint;
+ if (ignoredCheckpoint != null) {
+ checkpointSequenceNumber(
+ ignoredCheckpoint.checkpointer(),
+ ignoredCheckpoint.sequenceNumber(),
+ ignoredCheckpoint.subSequenceNumber()
+ );
}
+
+ return drainInvalidatedBatches();
}
Collection<RecordBatch> invalidate() {
if (invalidated.getAndSet(true)) {
return emptyList();
}
+ return drainInvalidatedBatches();
+ }
+
+ private Collection<RecordBatch> drainInvalidatedBatches() {
+ if (!invalidated.get()) {
+ throw new IllegalStateException("Can't drain invalidated
batches for valid shard buffer: " + bufferId);
+ }
Review Comment:
Just a safety check to make sure we perform this destructive action for
invalidated buffers only.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java:
##########
@@ -683,6 +697,9 @@ private void checkpointSafely(final CheckpointAction
checkpointAction) {
} catch (final ShutdownException e) {
logger.warn("Failed to checkpoint records due to shutdown.
Ignoring checkpoint", e);
return;
+ } catch (final RuntimeException e) {
Review Comment:
It's essential to ensure no exception is thrown from the buffer methods, so
that `freeMemory` is always called.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]