awelless commented on code in PR #10077: URL: https://github.com/apache/nifi/pull/10077#discussion_r2304063059
########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessor.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.List; + +import static java.util.Collections.emptyMap; +import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.RECORD_COUNT; +import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.RECORD_ERROR_MESSAGE; + +final class ReaderRecordProcessor { + + private final RecordReaderFactory recordReaderFactory; + private final RecordSetWriterFactory recordWriterFactory; + private final ComponentLog logger; + + ReaderRecordProcessor( + final RecordReaderFactory recordReaderFactory, + final RecordSetWriterFactory recordWriterFactory, + final ComponentLog logger) { + this.recordReaderFactory = recordReaderFactory; + this.recordWriterFactory = recordWriterFactory; + this.logger = logger; + } + + ProcessingResult processRecords( + final ProcessSession session, + final String streamName, + final String shardId, + final List<KinesisClientRecord> records) { + final List<FlowFile> successFlowFiles = new ArrayList<>(); + final List<FlowFile> failureFlowFiles = new ArrayList<>(); + + ActiveFlowFile activeFlowFile = null; + + for (final KinesisClientRecord kinesisRecord : records) { + final byte[] data = new byte[kinesisRecord.data().remaining()]; + kinesisRecord.data().get(data); + + try (final InputStream in = new ByteArrayInputStream(data); + final RecordReader reader = recordReaderFactory.createRecordReader(emptyMap(), in, data.length, logger)) { + + Record record; + while ((record = reader.nextRecord()) != null) { + final RecordSchema writeSchema = recordWriterFactory.getSchema(emptyMap(), record.getSchema()); + + if (activeFlowFile == null) { + activeFlowFile = ActiveFlowFile.startNewFile(logger, session, recordWriterFactory, writeSchema, streamName, shardId); + } else if (!writeSchema.equals(activeFlowFile.schema())) { + // If the write schema has changed, we need to complete the current FlowFile and start a new one. + final FlowFile completedFlowFile = activeFlowFile.complete(); + successFlowFiles.add(completedFlowFile); + + activeFlowFile = ActiveFlowFile.startNewFile(logger, session, recordWriterFactory, writeSchema, streamName, shardId); + } + + activeFlowFile.writeRecord(record, kinesisRecord); + } + } catch (final IOException | MalformedRecordException | SchemaNotFoundException e) { + logger.error("Failed to parse record from Kinesis stream using configured Record Reader", e); + final FlowFile failureFlowFile = createParseFailureFlowFile(session, streamName, shardId, kinesisRecord, e); + failureFlowFiles.add(failureFlowFile); + } + } + + if (activeFlowFile != null) { + final FlowFile completedFlowFile = activeFlowFile.complete(); + successFlowFiles.add(completedFlowFile); + } + + return new ProcessingResult(successFlowFiles, failureFlowFiles); + } + + private static FlowFile createParseFailureFlowFile( + final ProcessSession session, + final String streamName, + final String shardId, + final KinesisClientRecord record, + final Exception e) { + FlowFile flowFile = session.create(); + + record.data().rewind(); + flowFile = session.write(flowFile, out -> Channels.newChannel(out).write(record.data())); + + flowFile = session.putAllAttributes(flowFile, ConsumeKinesisAttributes.fromKinesisRecord(streamName, shardId, record)); + + final Throwable cause = e.getCause() != null ? e.getCause() : e; + final String errorMessage = cause.getLocalizedMessage() != null ? cause.getLocalizedMessage() : cause.getClass().getCanonicalName() + " thrown"; + flowFile = session.putAttribute(flowFile, RECORD_ERROR_MESSAGE, errorMessage); + + return flowFile; + } + + record ProcessingResult(List<FlowFile> successFlowFiles, List<FlowFile> parseFailureFlowFiles) { + } + + private static final class ActiveFlowFile { + + private final ComponentLog logger; + + private final ProcessSession session; + private final FlowFile flowFile; + private final RecordSetWriter writer; + private final RecordSchema schema; + + private final String streamName; + private final String shardId; + + private KinesisClientRecord lastRecord; + + private ActiveFlowFile( + final ComponentLog logger, + final ProcessSession session, + final FlowFile flowFile, + final RecordSetWriter writer, + final RecordSchema schema, + final String streamName, + final String shardId) { + this.logger = logger; + this.session = session; + this.flowFile = flowFile; + this.writer = writer; + this.schema = schema; + this.streamName = streamName; + this.shardId = shardId; + } + + static ActiveFlowFile startNewFile( + final ComponentLog logger, + final ProcessSession session, + final RecordSetWriterFactory recordWriterFactory, + final RecordSchema writeSchema, + final String streamName, + final String shardId) throws SchemaNotFoundException { + final FlowFile flowFile = session.create(); + final OutputStream outputStream = session.write(flowFile); + + try { + final RecordSetWriter writer = recordWriterFactory.createWriter(logger, writeSchema, outputStream, flowFile); + writer.beginRecordSet(); + + return new ActiveFlowFile(logger, session, flowFile, writer, writeSchema, streamName, shardId); + + } catch (final SchemaNotFoundException e) { + logger.debug("Failed to find writeSchema for Kinesis stream record", e); + try { + outputStream.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close FlowFile output stream", ioe); Review Comment: I removed full exception logging, even for debug logs. I still believe, that during debugging the logs with exception messages might be valuable. Stacktraces will be logged on the top level error handling. ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessor.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.List; + +import static java.util.Collections.emptyMap; +import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.RECORD_COUNT; +import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.RECORD_ERROR_MESSAGE; + +final class ReaderRecordProcessor { + + private final RecordReaderFactory recordReaderFactory; + private final RecordSetWriterFactory recordWriterFactory; + private final ComponentLog logger; + + ReaderRecordProcessor( + final RecordReaderFactory recordReaderFactory, + final RecordSetWriterFactory recordWriterFactory, + final ComponentLog logger) { + this.recordReaderFactory = recordReaderFactory; + this.recordWriterFactory = recordWriterFactory; + this.logger = logger; + } + + ProcessingResult processRecords( + final ProcessSession session, + final String streamName, + final String shardId, + final List<KinesisClientRecord> records) { + final List<FlowFile> successFlowFiles = new ArrayList<>(); + final List<FlowFile> failureFlowFiles = new ArrayList<>(); + + ActiveFlowFile activeFlowFile = null; + + for (final KinesisClientRecord kinesisRecord : records) { + final byte[] data = new byte[kinesisRecord.data().remaining()]; + kinesisRecord.data().get(data); + + try (final InputStream in = new ByteArrayInputStream(data); + final RecordReader reader = recordReaderFactory.createRecordReader(emptyMap(), in, data.length, logger)) { + + Record record; + while ((record = reader.nextRecord()) != null) { + final RecordSchema writeSchema = recordWriterFactory.getSchema(emptyMap(), record.getSchema()); + + if (activeFlowFile == null) { + activeFlowFile = ActiveFlowFile.startNewFile(logger, session, recordWriterFactory, writeSchema, streamName, shardId); + } else if (!writeSchema.equals(activeFlowFile.schema())) { + // If the write schema has changed, we need to complete the current FlowFile and start a new one. + final FlowFile completedFlowFile = activeFlowFile.complete(); + successFlowFiles.add(completedFlowFile); + + activeFlowFile = ActiveFlowFile.startNewFile(logger, session, recordWriterFactory, writeSchema, streamName, shardId); + } + + activeFlowFile.writeRecord(record, kinesisRecord); + } + } catch (final IOException | MalformedRecordException | SchemaNotFoundException e) { + logger.error("Failed to parse record from Kinesis stream using configured Record Reader", e); + final FlowFile failureFlowFile = createParseFailureFlowFile(session, streamName, shardId, kinesisRecord, e); + failureFlowFiles.add(failureFlowFile); + } + } + + if (activeFlowFile != null) { + final FlowFile completedFlowFile = activeFlowFile.complete(); + successFlowFiles.add(completedFlowFile); + } + + return new ProcessingResult(successFlowFiles, failureFlowFiles); + } + + private static FlowFile createParseFailureFlowFile( + final ProcessSession session, + final String streamName, + final String shardId, + final KinesisClientRecord record, + final Exception e) { + FlowFile flowFile = session.create(); + + record.data().rewind(); + flowFile = session.write(flowFile, out -> Channels.newChannel(out).write(record.data())); + + flowFile = session.putAllAttributes(flowFile, ConsumeKinesisAttributes.fromKinesisRecord(streamName, shardId, record)); + + final Throwable cause = e.getCause() != null ? e.getCause() : e; + final String errorMessage = cause.getLocalizedMessage() != null ? cause.getLocalizedMessage() : cause.getClass().getCanonicalName() + " thrown"; + flowFile = session.putAttribute(flowFile, RECORD_ERROR_MESSAGE, errorMessage); + + return flowFile; + } + + record ProcessingResult(List<FlowFile> successFlowFiles, List<FlowFile> parseFailureFlowFiles) { + } + + private static final class ActiveFlowFile { + + private final ComponentLog logger; + + private final ProcessSession session; + private final FlowFile flowFile; + private final RecordSetWriter writer; + private final RecordSchema schema; + + private final String streamName; + private final String shardId; + + private KinesisClientRecord lastRecord; + + private ActiveFlowFile( + final ComponentLog logger, + final ProcessSession session, + final FlowFile flowFile, + final RecordSetWriter writer, + final RecordSchema schema, + final String streamName, + final String shardId) { + this.logger = logger; + this.session = session; + this.flowFile = flowFile; + this.writer = writer; + this.schema = schema; + this.streamName = streamName; + this.shardId = shardId; + } + + static ActiveFlowFile startNewFile( + final ComponentLog logger, + final ProcessSession session, + final RecordSetWriterFactory recordWriterFactory, + final RecordSchema writeSchema, + final String streamName, + final String shardId) throws SchemaNotFoundException { + final FlowFile flowFile = session.create(); + final OutputStream outputStream = session.write(flowFile); + + try { + final RecordSetWriter writer = recordWriterFactory.createWriter(logger, writeSchema, outputStream, flowFile); + writer.beginRecordSet(); + + return new ActiveFlowFile(logger, session, flowFile, writer, writeSchema, streamName, shardId); + + } catch (final SchemaNotFoundException e) { + logger.debug("Failed to find writeSchema for Kinesis stream record", e); + try { + outputStream.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close FlowFile output stream", ioe); + e.addSuppressed(ioe); + } + throw e; + + } catch (final IOException e) { + final ProcessException processException = new ProcessException("Failed to create a writer for a FlowFile", e); + + logger.debug("Failed to create a writer for a FlowFile. Stopping Kinesis records processing", e); + try { + outputStream.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close FlowFile output stream", ioe); + processException.addSuppressed(ioe); + } + throw processException; + } + } + + RecordSchema schema() { + return schema; + } + + void writeRecord(final Record record, final KinesisClientRecord kinesisRecord) { + try { + writer.write(record); + } catch (final IOException e) { + logger.debug("Failed to write to a FlowFile. Stopping Kinesis records processing", e); Review Comment: I removed full exception logging, even for debug logs. I still believe, that during debugging the logs with exception messages might be valuable. Stacktraces will be logged on the top level error handling. ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ReaderRecordProcessor.java: ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.List; + +import static java.util.Collections.emptyMap; +import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.RECORD_COUNT; +import static org.apache.nifi.processors.aws.kinesis.ConsumeKinesisAttributes.RECORD_ERROR_MESSAGE; + +final class ReaderRecordProcessor { + + private final RecordReaderFactory recordReaderFactory; + private final RecordSetWriterFactory recordWriterFactory; + private final ComponentLog logger; + + ReaderRecordProcessor( + final RecordReaderFactory recordReaderFactory, + final RecordSetWriterFactory recordWriterFactory, + final ComponentLog logger) { + this.recordReaderFactory = recordReaderFactory; + this.recordWriterFactory = recordWriterFactory; + this.logger = logger; + } + + ProcessingResult processRecords( + final ProcessSession session, + final String streamName, + final String shardId, + final List<KinesisClientRecord> records) { + final List<FlowFile> successFlowFiles = new ArrayList<>(); + final List<FlowFile> failureFlowFiles = new ArrayList<>(); + + ActiveFlowFile activeFlowFile = null; Review Comment: I added docs on the `ActiveFlowFile` class. For me java docs are more readable than inline comments. What do you think? ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java: ########## @@ -0,0 +1,672 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis; + +import jakarta.annotation.Nullable; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ShardBufferId; +import org.apache.nifi.processors.aws.kinesis.RecordBuffer.ShardBufferLease; +import software.amazon.kinesis.exceptions.InvalidStateException; +import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException; +import software.amazon.kinesis.exceptions.ShutdownException; +import software.amazon.kinesis.exceptions.ThrottlingException; +import software.amazon.kinesis.processor.RecordProcessorCheckpointer; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Queue; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Collections.emptyList; + +/** + * A record buffer which limits the maximum memory usage across all shard buffers. + * If the memory limit is reached, adding new records will block until enough memory is freed. + */ +final class MemoryBoundRecordBuffer implements RecordBuffer.ForKinesisClientLibrary, RecordBuffer.ForProcessor { + + private final ComponentLog logger; + + private final long checkpointIntervalMillis; + private final BlockingMemoryTracker memoryTracker; + + private final AtomicLong bufferIdCounter = new AtomicLong(0); + private final ConcurrentMap<ShardBufferId, ShardBuffer> shardBuffers = new ConcurrentHashMap<>(); + + /** + * A queue with ids shard buffers available for leasing. + * <p> + * Note: when a buffer is invalidated its id is NOT removed from the queue immediately. + */ + private final Queue<ShardBufferId> buffersToLease = new ConcurrentLinkedQueue<>(); + + MemoryBoundRecordBuffer(final ComponentLog logger, final long maxMemoryBytes, final Duration checkpointInterval) { + this.logger = logger; + this.memoryTracker = new BlockingMemoryTracker(logger, maxMemoryBytes); + this.checkpointIntervalMillis = checkpointInterval.toMillis(); + } + + @Override + public ShardBufferId createBuffer(final String shardId) { + final ShardBufferId id = new ShardBufferId(shardId, bufferIdCounter.getAndIncrement()); + + logger.info("Creating new buffer for shard {} with id {}", shardId, id); + + shardBuffers.put(id, new ShardBuffer(id, logger, checkpointIntervalMillis)); + buffersToLease.add(id); + return id; + } + + @Override + public void addRecords(final ShardBufferId bufferId, final List<KinesisClientRecord> records, final RecordProcessorCheckpointer checkpointer) { + if (records.isEmpty()) { + return; + } + + final ShardBuffer buffer = shardBuffers.get(bufferId); + if (buffer == null) { + logger.debug("Buffer with id {} not found. Cannot add records with sequence and subsequence numbers: {}.{} - {}.{}", + bufferId, + records.getFirst().sequenceNumber(), + records.getFirst().subSequenceNumber(), + records.getLast().sequenceNumber(), + records.getLast().subSequenceNumber()); + return; + } + + final RecordBatch recordBatch = new RecordBatch(records, checkpointer, calculateMemoryUsage(records)); + memoryTracker.reserveMemory(recordBatch); + final boolean addedRecords = buffer.offer(recordBatch); + + if (addedRecords) { + logger.debug("Successfully added records with sequence and subsequence numbers: {}.{} - {}.{} to buffer with id {}", + records.getFirst().sequenceNumber(), + records.getFirst().subSequenceNumber(), + records.getLast().sequenceNumber(), + records.getLast().subSequenceNumber(), + bufferId); + } else { + logger.debug("Buffer with id {} was invalidated. Cannot add records with sequence and subsequence numbers: {}.{} - {}.{}", + bufferId, + records.getFirst().sequenceNumber(), + records.getFirst().subSequenceNumber(), + records.getLast().sequenceNumber(), + records.getLast().subSequenceNumber()); + // If the buffer was invalidated, we should free memory reserved for these records. + memoryTracker.freeMemory(List.of(recordBatch)); + } + } + + @Override + public void checkpointEndedShard(final ShardBufferId bufferId, final RecordProcessorCheckpointer checkpointer) { + final ShardBuffer buffer = shardBuffers.get(bufferId); + if (buffer == null) { + logger.debug("Buffer with id {} not found. Cannot checkpoint the ended shard", bufferId); + return; + } + + logger.info("Finishing consumption for buffer {}. Checkpointing the ended shard", bufferId); + buffer.checkpointEndedShard(checkpointer); + + logger.debug("Removing buffer with id {} after successful ended shard checkpoint", bufferId); + shardBuffers.remove(bufferId); + } + + @Override + public void shutdownShardConsumption(final ShardBufferId bufferId, final RecordProcessorCheckpointer checkpointer) { + final ShardBuffer buffer = shardBuffers.get(bufferId); + if (buffer == null) { + logger.debug("Buffer with id {} not found. Cannot shutdown shard consumption", bufferId); + return; + } + + logger.info("Shutting down the buffer {}. Checkpointing last consumed record", bufferId); + buffer.shutdownBuffer(checkpointer); + + logger.debug("Removing buffer with id {} after successful last consumed record checkpoint", bufferId); + shardBuffers.remove(bufferId); + } + + @Override + public void consumerLeaseLost(final ShardBufferId bufferId) { + final ShardBuffer buffer = shardBuffers.remove(bufferId); + + logger.info("Lease lost for buffer {}. Invalidating it", bufferId); + + if (buffer != null) { + final Collection<RecordBatch> invalidatedBatches = buffer.invalidate(); + memoryTracker.freeMemory(invalidatedBatches); + } + } + + @Override + public Optional<ShardBufferLease> acquireBufferLease() { + final Set<ShardBufferId> seenBuffers = new HashSet<>(); + + while (true) { + final ShardBufferId bufferId = buffersToLease.poll(); + if (bufferId == null) { + // The queue is empty or all buffers were seen already. Nothing to consume. + return Optional.empty(); + } + + if (seenBuffers.contains(bufferId)) { + // If the same buffer is seen again, there is a high chance we iterated through most of the buffers and didn't find any that isn't empty. + // To avoid burning CPU we return empty here, even if some buffer received records in the meantime. It will be picked up in the next iteration. + buffersToLease.add(bufferId); + return Optional.empty(); + } + + final ShardBuffer buffer = shardBuffers.get(bufferId); + + if (buffer == null) { + // By the time the bufferId is polled, it might have been invalidated. No need to return it to the queue. + logger.debug("Buffer with id {} was removed while polling for lease. Continuing to poll.", bufferId); + } else if (buffer.isEmpty()) { + seenBuffers.add(bufferId); + buffersToLease.add(bufferId); + logger.debug("Buffer with id {} is empty. Continuing to poll.", bufferId); + } else { + logger.debug("Acquired lease for buffer {}", bufferId); + return Optional.of(new StandardShardBufferLease(bufferId)); + } + } + } + + @Override + public List<KinesisClientRecord> consumeRecords(final ShardBufferLease lease) { + if (!(lease instanceof StandardShardBufferLease l)) { + throw new IllegalArgumentException("Unexpected lease type: " + lease.getClass().getName()); + } + + if (l.returnedToPool.get()) { + logger.warn("Attempting to consume records from a buffer that was already returned to the pool. Ignoring."); + return emptyList(); + } + + final ShardBufferId bufferId = l.bufferId; + + final ShardBuffer buffer = shardBuffers.get(bufferId); + if (buffer == null) { + logger.debug("Buffer with id {} not found. Cannot consume records", bufferId); + return emptyList(); + } + + return buffer.consumeRecords(); + } + + @Override + public void commitConsumedRecords(final ShardBufferLease lease) { + if (!(lease instanceof StandardShardBufferLease l)) { + throw new IllegalArgumentException("Unexpected lease type: " + lease.getClass().getName()); + } + + if (l.returnedToPool.get()) { + logger.warn("Attempting to commit records from a buffer that was already returned to the pool. Ignoring."); + return; + } + + final ShardBufferId bufferId = l.bufferId; + + final ShardBuffer buffer = shardBuffers.get(bufferId); + if (buffer == null) { + logger.debug("Buffer with id {} not found. Cannot commit consumed records", bufferId); + return; + } + + final List<RecordBatch> consumedBatches = buffer.commitConsumedRecords(); + memoryTracker.freeMemory(consumedBatches); + } + + @Override + public void rollbackConsumedRecords(final ShardBufferLease lease) { + if (!(lease instanceof StandardShardBufferLease l)) { + throw new IllegalArgumentException("Unexpected lease type: " + lease.getClass().getName()); + } + + if (l.returnedToPool.get()) { + logger.warn("Attempting to rollback records from a buffer that was already returned to the pool. Ignoring."); + return; + } + + final ShardBufferId bufferId = l.bufferId; + final ShardBuffer buffer = shardBuffers.get(bufferId); + + if (buffer != null) { + buffer.rollbackConsumedRecords(); + } + } + + @Override + public void returnBufferLease(final ShardBufferLease lease) { + if (!(lease instanceof StandardShardBufferLease l)) { + throw new IllegalArgumentException("Unexpected lease type: " + lease.getClass().getName()); + } + + if (l.returnedToPool.getAndSet(true)) { + logger.warn("Attempting to return a buffer that was already returned to the pool. Ignoring."); + return; + } + + final ShardBufferId bufferId = l.bufferId; + buffersToLease.add(bufferId); + + logger.debug("The buffer {} is available for lease again", bufferId); + } + + private static class StandardShardBufferLease implements ShardBufferLease { + + private final ShardBufferId bufferId; + private final AtomicBoolean returnedToPool = new AtomicBoolean(false); + + StandardShardBufferLease(final ShardBufferId bufferId) { + this.bufferId = bufferId; + } + + @Override + public String shardId() { + return bufferId.shardId(); + } + } + + /** + * A memory tracker which blocks a thread when the memory usage exceeds the allowed maximum. + * <p> + * In order to make progress, the memory consumption may exceed the limit, but any new records will not be accepted. + * This is done to support the case when a single record batch is larger than the allowed memory limit. + */ + private static class BlockingMemoryTracker { + + private static final long AWAIT_MILLIS = 100; + + private final ComponentLog logger; + + private final long maxMemoryBytes; + + private final AtomicLong consumedMemoryBytes = new AtomicLong(0); + /** + * Whenever memory is freed a latch opens. Then replaced with a new one. + */ + private final AtomicReference<CountDownLatch> memoryAvailableLatch = new AtomicReference<>(new CountDownLatch(1)); + + BlockingMemoryTracker(final ComponentLog logger, final long maxMemoryBytes) { + this.logger = logger; + this.maxMemoryBytes = maxMemoryBytes; + } + + void reserveMemory(final RecordBatch recordBatch) { + final long consumedBytes = recordBatch.batchSizeBytes(); + + if (consumedBytes == 0) { + return; + } + + while (true) { + final long currentlyConsumedBytes = consumedMemoryBytes.get(); + + if (currentlyConsumedBytes >= maxMemoryBytes) { + // Not enough memory available, need to wait. + try { + memoryAvailableLatch.get().await(AWAIT_MILLIS, TimeUnit.MILLISECONDS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Thread interrupted while waiting for available memory in RecordBuffer", e); + } + } else { + final long newConsumedBytes = currentlyConsumedBytes + consumedBytes; + if (consumedMemoryBytes.compareAndSet(currentlyConsumedBytes, newConsumedBytes)) { + logger.debug("Reserved {} bytes for {} records. Total consumed memory: {} bytes", + consumedBytes, recordBatch.size(), newConsumedBytes); + break; + } + // If we're here, the compare and set operation failed, as another thread has modified the gauge in meantime. + // Retrying the operation. + } + } + } + + void freeMemory(final Collection<RecordBatch> consumedBatches) { + if (consumedBatches.isEmpty()) { + return; + } + + long freedBytes = 0; + for (final RecordBatch batch : consumedBatches) { + freedBytes += batch.batchSizeBytes(); + } + + while (true) { + final long currentlyConsumedBytes = consumedMemoryBytes.get(); + if (currentlyConsumedBytes < freedBytes) { + throw new IllegalStateException("Attempting to free more memory than currently used"); + } + + final long newTotal = currentlyConsumedBytes - freedBytes; + if (consumedMemoryBytes.compareAndSet(currentlyConsumedBytes, newTotal)) { + logger.debug("Freed {} bytes for {} batches. Total consumed memory: {} bytes", + freedBytes, consumedBatches.size(), newTotal); + + final CountDownLatch oldLatch = memoryAvailableLatch.getAndSet(new CountDownLatch(1)); + oldLatch.countDown(); // Release any waiting threads for free memory. + break; + } + } + } + } + + private record RecordBatch(List<KinesisClientRecord> records, + @Nullable RecordProcessorCheckpointer checkpointer, + long batchSizeBytes) { + int size() { + return records.size(); + } + } + + private long calculateMemoryUsage(final Collection<KinesisClientRecord> records) { + long totalBytes = 0; + for (final KinesisClientRecord record : records) { + final ByteBuffer data = record.data(); + if (data != null) { + totalBytes += data.capacity(); + } + } + return totalBytes; + } + + /** + * ShardBuffer stores all record batches for a single shard in two queues: + * - IN_PROGRESS: record batches that have been consumed but not yet checkpointed. + * - PENDING: record batches that have been added but not yet consumed. + * <p> + * When consuming records all PENDING batches are moved to IN_PROGRESS. + * After a successful checkpoint all IN_PROGRESS batches are cleared. + * After a rollback all IN_PROGRESS batches are kept, allowing to retry consumption. + * <p> + * Each batch preserves the original grouping of records as provided by Kinesis + * along with their associated checkpointer, ensuring atomicity. + */ + private static class ShardBuffer { + + private static final long AWAIT_MILLIS = 100; + + // Retry configuration. + private static final int MAX_RETRY_ATTEMPTS = 5; + private static final long BASE_RETRY_DELAY_MILLIS = 100; + private static final long MAX_RETRY_DELAY_MILLIS = 10_000; + private static final Random RANDOM = new Random(); + + private final ShardBufferId bufferId; + private final ComponentLog logger; + + private final long checkpointIntervalMillis; + private volatile long nextCheckpointTimeMillis; + /** + * A last records checkpointer that was ignored due to the checkpoint interval. + * If null, the last checkpoint was successful or no checkpoint was attempted yet. + */ + private volatile @Nullable RecordProcessorCheckpointer lastIgnoredCheckpointer = null; + + /** + * Queues for managing record batches with their checkpointers in different states. + */ + private final Queue<RecordBatch> inProgressBatches = new ConcurrentLinkedQueue<>(); + private final Queue<RecordBatch> pendingBatches = new ConcurrentLinkedQueue<>(); + /** + * Counter for tracking the number of batches in the buffer. Can be larger than the number of batches in the queues. + */ + private final AtomicInteger batchesCount = new AtomicInteger(0); + + /** + * A countdown latch that is used to signal when the buffer becomes empty. Used when ShardBuffer should be closed. + */ + private volatile @Nullable CountDownLatch emptyBufferLatch = null; + private final AtomicBoolean invalidated = new AtomicBoolean(false); + + ShardBuffer(final ShardBufferId bufferId, final ComponentLog logger, final long checkpointIntervalMillis) { + this.bufferId = bufferId; + this.logger = logger; + this.checkpointIntervalMillis = checkpointIntervalMillis; + this.nextCheckpointTimeMillis = System.currentTimeMillis() + checkpointIntervalMillis; + } + + /** + * @param recordBatch record batch with records to add. + * @return true if the records were added successfully, false if a buffer was invalidated. + */ + boolean offer(final RecordBatch recordBatch) { + if (invalidated.get()) { + return false; + } + + // Batches count must be always equal to or larger than the number of batches in the queues. + // Thus, the ordering of the operations. + batchesCount.incrementAndGet(); + pendingBatches.offer(recordBatch); + + return true; + } + + List<KinesisClientRecord> consumeRecords() { + if (invalidated.get()) { + return emptyList(); + } + + RecordBatch pendingBatch; + while ((pendingBatch = pendingBatches.poll()) != null) { + inProgressBatches.offer(pendingBatch); + } + + final List<KinesisClientRecord> recordsToConsume = new ArrayList<>(); + for (final RecordBatch batch : inProgressBatches) { + recordsToConsume.addAll(batch.records()); + } + + return recordsToConsume; + } + + List<RecordBatch> commitConsumedRecords() { + if (invalidated.get()) { + return emptyList(); + } + + final List<RecordBatch> checkpointedBatches = new ArrayList<>(); + RecordBatch batch; + while ((batch = inProgressBatches.poll()) != null) { + checkpointedBatches.add(batch); + } + + if (checkpointedBatches.isEmpty()) { + // The buffer could be invalidated in the meantime, or no records were consumed. + return emptyList(); + } + + // Batches count must always be equal to or larger than the number of batches in the queues. + // To achieve so, the count is decreased only after the queue has been emptied. + batchesCount.addAndGet(-checkpointedBatches.size()); + + final RecordProcessorCheckpointer lastBatchCheckpointer = checkpointedBatches.getLast().checkpointer(); + if (System.currentTimeMillis() >= nextCheckpointTimeMillis) { + checkpointSafely(lastBatchCheckpointer); + nextCheckpointTimeMillis = System.currentTimeMillis() + checkpointIntervalMillis; + lastIgnoredCheckpointer = null; + } else { + // Saving the checkpointer for later, in case shutdown happens before the next checkpoint. + lastIgnoredCheckpointer = lastBatchCheckpointer; + } + + final CountDownLatch localEmptyBufferLatch = this.emptyBufferLatch; + if (localEmptyBufferLatch != null && isEmpty()) { + // If the latch is not null, it means we are waiting for the buffer to become empty. + localEmptyBufferLatch.countDown(); + } + + return checkpointedBatches; + } + + void rollbackConsumedRecords() { + if (invalidated.get()) { + return; + } + + for (final RecordBatch recordBatch : inProgressBatches) { + for (final KinesisClientRecord record : recordBatch.records()) { + record.data().rewind(); + } + } + } + + void checkpointEndedShard(final RecordProcessorCheckpointer checkpointer) { Review Comment: Leaving it for other reviewers. [AWS Kinesis Stream Resharding](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-resharding.html). Kinesis allows us to change the number of Shards in a Stream in both direction. When this happens either one Shard is split into two Shards, or two Shards are merged into one Shard. To preserve the message ordering across resharding operations child Shards are not consumed unless their parents are entirely consumed. [KCL names this lifecycle hook](https://docs.aws.amazon.com/streams/latest/dev/develop-kcl-consumers-java.html#:~:text=in%20this%20method.-,shardEnded(ShardEndedInput%20shardEndedInput),-Purpose%3A%20Finish%20processing) as `shardEnded`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
