markap14 commented on code in PR #10964:
URL: https://github.com/apache/nifi/pull/10964#discussion_r2907173203
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -347,545 +316,1121 @@ Specifies the string (interpreted as UTF-8) to use for
demarcating multiple Kine
private static final Set<Relationship> RAW_FILE_RELATIONSHIPS =
Set.of(REL_SUCCESS);
private static final Set<Relationship> RECORD_FILE_RELATIONSHIPS =
Set.of(REL_SUCCESS, REL_PARSE_FAILURE);
- private volatile DynamoDbAsyncClient dynamoDbClient;
- private volatile CloudWatchAsyncClient cloudWatchClient;
- private volatile KinesisAsyncClient kinesisClient;
- private volatile Scheduler kinesisScheduler;
-
+ private volatile SdkHttpClient kinesisHttpClient;
+ private volatile SdkHttpClient dynamoHttpClient;
+ private volatile KinesisClient kinesisClient;
+ private volatile DynamoDbClient dynamoDbClient;
+ private volatile SdkAsyncHttpClient asyncHttpClient;
+ private volatile KinesisShardManager shardManager;
+ private volatile KinesisConsumerClient consumerClient;
private volatile String streamName;
- private volatile RecordBuffer.ForProcessor<Lease> recordBuffer;
-
- private volatile @Nullable ReaderRecordProcessor readerRecordProcessor;
- private volatile @Nullable byte[] demarcatorValue;
+ private volatile int maxRecordsPerRequest;
+ private volatile String initialStreamPosition;
+ private volatile long maxBatchNanos;
+ private volatile long maxBatchBytes;
- private volatile Future<InitializationResult> initializationResultFuture;
- private final AtomicBoolean initialized = new AtomicBoolean();
-
- // An instance filed, so that it can be read in getRelationships.
- private volatile ProcessingStrategy processingStrategy =
ProcessingStrategy.from(
- PROCESSING_STRATEGY.getDefaultValue());
+ private volatile ProcessingStrategy processingStrategy =
ProcessingStrategy.valueOf(PROCESSING_STRATEGY.getDefaultValue());
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
- @Override
- public void migrateProperties(final PropertyConfiguration config) {
- ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
- }
-
@Override
public Set<Relationship> getRelationships() {
return switch (processingStrategy) {
- case FLOW_FILE, DEMARCATOR -> RAW_FILE_RELATIONSHIPS;
+ case FLOW_FILE, LINE_DELIMITED, DEMARCATOR ->
RAW_FILE_RELATIONSHIPS;
case RECORD -> RECORD_FILE_RELATIONSHIPS;
};
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
if (descriptor.equals(PROCESSING_STRATEGY)) {
- processingStrategy = ProcessingStrategy.from(newValue);
+ processingStrategy = ProcessingStrategy.valueOf(newValue);
}
}
- @OnScheduled
- public void setup(final ProcessContext context) {
- readerRecordProcessor = switch (processingStrategy) {
- case FLOW_FILE, DEMARCATOR -> null;
- case RECORD -> createReaderRecordProcessor(context);
- };
- demarcatorValue = switch (processingStrategy) {
- case FLOW_FILE, RECORD -> null;
- case DEMARCATOR -> {
- final String demarcatorValue =
context.getProperty(MESSAGE_DEMARCATOR).getValue();
- yield demarcatorValue != null ?
demarcatorValue.getBytes(UTF_8) : new byte[0];
- }
- };
+ @Override
+ public void migrateProperties(final PropertyConfiguration config) {
+ config.renameProperty("Max Bytes to Buffer", "Max Batch Size");
+ config.removeProperty("Checkpoint Interval");
+ config.removeProperty("Metrics Publishing");
+ }
+
+ @Override
+ public void migrateRelationships(final RelationshipConfiguration config) {
+ config.renameRelationship("parse failure", "parse.failure");
+ }
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
final Region region = RegionUtil.getRegion(context);
final AwsCredentialsProvider credentialsProvider =
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE)
.asControllerService(AwsCredentialsProviderService.class).getAwsCredentialsProvider();
+ final String endpointOverride =
context.getProperty(ENDPOINT_OVERRIDE).getValue();
- kinesisClient = KinesisAsyncClient.builder()
- .region(region)
- .credentialsProvider(credentialsProvider)
- .endpointOverride(getKinesisEndpointOverride())
- .httpClient(createKinesisHttpClient(context))
+ final ClientOverrideConfiguration clientConfig =
ClientOverrideConfiguration.builder()
+ .apiCallTimeout(API_CALL_TIMEOUT)
+ .apiCallAttemptTimeout(API_CALL_ATTEMPT_TIMEOUT)
.build();
- dynamoDbClient = DynamoDbAsyncClient.builder()
+ final KinesisClientBuilder kinesisBuilder = KinesisClient.builder()
.region(region)
.credentialsProvider(credentialsProvider)
- .endpointOverride(getDynamoDbEndpointOverride())
- .httpClient(createHttpClientBuilder(context).build())
- .build();
+ .overrideConfiguration(clientConfig);
- cloudWatchClient = CloudWatchAsyncClient.builder()
+ final DynamoDbClientBuilder dynamoBuilder = DynamoDbClient.builder()
.region(region)
.credentialsProvider(credentialsProvider)
- .endpointOverride(getCloudwatchEndpointOverride())
- .httpClient(createHttpClientBuilder(context).build())
- .build();
-
- streamName = context.getProperty(STREAM_NAME).getValue();
- final InitialPositionInStreamExtended initialPositionExtended =
getInitialPosition(context);
- final SingleStreamTracker streamTracker = new
SingleStreamTracker(streamName, initialPositionExtended);
-
- final long maxBytesToBuffer =
context.getProperty(MAX_BYTES_TO_BUFFER).asDataSize(DataUnit.B).longValue();
- final Duration checkpointInterval =
context.getProperty(CHECKPOINT_INTERVAL).asDuration();
- final MemoryBoundRecordBuffer memoryBoundRecordBuffer = new
MemoryBoundRecordBuffer(getLogger(), maxBytesToBuffer, checkpointInterval);
- recordBuffer = memoryBoundRecordBuffer;
- final ShardRecordProcessorFactory recordProcessorFactory = () -> new
ConsumeKinesisRecordProcessor(memoryBoundRecordBuffer);
-
- final String applicationName =
context.getProperty(APPLICATION_NAME).getValue();
- final String workerId = generateWorkerId();
- final ConfigsBuilder configsBuilder = new
ConfigsBuilder(streamTracker, applicationName, kinesisClient, dynamoDbClient,
cloudWatchClient, workerId, recordProcessorFactory);
-
- final MetricsFactory metricsFactory = configureMetricsFactory(context);
- final RetrievalSpecificConfig retrievalSpecificConfig =
configureRetrievalSpecificConfig(context, kinesisClient, streamName,
applicationName);
-
- final InitializationStateChangeListener initializationListener = new
InitializationStateChangeListener(getLogger());
- initialized.set(false);
- initializationResultFuture = initializationListener.result();
-
- kinesisScheduler = new Scheduler(
- configsBuilder.checkpointConfig(),
-
configsBuilder.coordinatorConfig().workerStateChangeListener(initializationListener),
- configsBuilder.leaseManagementConfig(),
- configsBuilder.lifecycleConfig(),
- configsBuilder.metricsConfig().metricsFactory(metricsFactory),
- configsBuilder.processorConfig(),
-
configsBuilder.retrievalConfig().retrievalSpecificConfig(retrievalSpecificConfig)
- );
-
- final String schedulerThreadName =
"%s-Scheduler-%s".formatted(getClass().getSimpleName(), getIdentifier());
- final Thread schedulerThread = new Thread(kinesisScheduler,
schedulerThreadName);
- schedulerThread.setDaemon(true);
- schedulerThread.start();
- // The thread is stopped when kinesisScheduler is shutdown in the
onStopped method.
+ .overrideConfiguration(clientConfig);
- try {
- final InitializationResult result = initializationResultFuture.get(
-
KINESIS_SCHEDULER_ON_SCHEDULED_INITIALIZATION_TIMEOUT.getSeconds(), SECONDS);
- checkInitializationResult(result);
- } catch (final TimeoutException e) {
- // During a first run the processor will take more time to
initialize. We return from OnSchedule and continue waiting in the onTrigger
method.
- getLogger().warn("Kinesis Scheduler initialization may take up to
10 minutes on a first run, which is caused by AWS resources initialization");
- } catch (final InterruptedException | ExecutionException e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- cleanUpState();
- throw new ProcessException("Initialization failed for stream
[%s]".formatted(streamName), e);
+ if (endpointOverride != null && !endpointOverride.isEmpty()) {
+ final URI endpointUri = URI.create(endpointOverride);
+ kinesisBuilder.endpointOverride(endpointUri);
+ dynamoBuilder.endpointOverride(endpointUri);
}
- }
- /**
- * Creating Kinesis HTTP client, as per
- * {@link
software.amazon.kinesis.common.KinesisClientUtil#adjustKinesisClientBuilder(KinesisAsyncClientBuilder)}.
- */
- private static SdkAsyncHttpClient createKinesisHttpClient(final
ProcessContext context) {
- return createHttpClientBuilder(context)
- .protocol(Protocol.HTTP2)
- // Since we're using HTTP/2, multiple concurrent requests will
reuse the same HTTP connection.
- // Therefore, the number of real connections is going to be
relatively small.
- .maxConcurrency(Integer.MAX_VALUE)
- .http2Configuration(Http2Configuration.builder()
-
.initialWindowSize(KINESIS_HTTP_CLIENT_WINDOW_SIZE_BYTES)
-
.healthCheckPingPeriod(KINESIS_HTTP_HEALTH_CHECK_PERIOD)
- .build())
- .build();
- }
+ final ProxyConfiguration proxyConfig =
ProxyConfiguration.getConfiguration(context);
- private static NettyNioAsyncHttpClient.Builder
createHttpClientBuilder(final ProcessContext context) {
- final NettyNioAsyncHttpClient.Builder builder =
NettyNioAsyncHttpClient.builder()
- .connectionTimeout(HTTP_CLIENTS_CONNECTION_TIMEOUT)
- .readTimeout(HTTP_CLIENTS_READ_TIMEOUT);
+ kinesisHttpClient = buildApacheHttpClient(proxyConfig,
PollingKinesisClient.MAX_CONCURRENT_FETCHES + 10);
+ dynamoHttpClient = buildApacheHttpClient(proxyConfig, 50);
+ kinesisBuilder.httpClient(kinesisHttpClient);
+ dynamoBuilder.httpClient(dynamoHttpClient);
+
+ kinesisClient = kinesisBuilder.build();
+ dynamoDbClient = dynamoBuilder.build();
+
+ final String checkpointTableName =
context.getProperty(APPLICATION_NAME).getValue();
+ streamName = context.getProperty(STREAM_NAME).getValue();
+ maxRecordsPerRequest =
context.getProperty(MAX_RECORDS_PER_REQUEST).asInteger();
+ initialStreamPosition =
context.getProperty(INITIAL_STREAM_POSITION).getValue();
+ maxBatchNanos =
context.getProperty(MAX_BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
+ maxBatchBytes =
context.getProperty(MAX_BATCH_SIZE).asDataSize(DataUnit.B).longValue();
+
+ shardManager = createShardManager(kinesisClient, dynamoDbClient,
getLogger(), checkpointTableName, streamName);
+ shardManager.ensureCheckpointTableExists();
+
+ final boolean efoMode =
ConsumerType.ENHANCED_FAN_OUT.equals(context.getProperty(CONSUMER_TYPE).asAllowableValue(ConsumerType.class));
+ consumerClient = createConsumerClient(kinesisClient, getLogger(),
efoMode);
+
+ final Instant timestampForPosition = resolveTimestampPosition(context);
+ if (timestampForPosition != null) {
+ if (consumerClient instanceof PollingKinesisClient polling) {
+ polling.setTimestampForInitialPosition(timestampForPosition);
+ } else if (consumerClient instanceof EfoKinesisClient efo) {
+ efo.setTimestampForInitialPosition(timestampForPosition);
+ }
+ }
- final ProxyConfigurationService proxyConfigService =
context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
- if (proxyConfigService != null) {
- final ProxyConfiguration proxyConfig =
proxyConfigService.getConfiguration();
+ if (efoMode) {
+ final NettyNioAsyncHttpClient.Builder nettyBuilder =
NettyNioAsyncHttpClient.builder()
+ .maxConcurrency(500)
+ .connectionAcquisitionTimeout(Duration.ofSeconds(60));
- final
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.Builder
proxyConfigBuilder =
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder()
+ if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+ final
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.Builder
nettyProxyBuilder =
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder()
.host(proxyConfig.getProxyServerHost())
.port(proxyConfig.getProxyServerPort());
- if (proxyConfig.hasCredential()) {
- proxyConfigBuilder.username(proxyConfig.getProxyUserName());
-
proxyConfigBuilder.password(proxyConfig.getProxyUserPassword());
- }
+ if (proxyConfig.hasCredential()) {
+ nettyProxyBuilder.username(proxyConfig.getProxyUserName());
+
nettyProxyBuilder.password(proxyConfig.getProxyUserPassword());
+ }
- builder.proxyConfiguration(proxyConfigBuilder.build());
- }
+ nettyBuilder.proxyConfiguration(nettyProxyBuilder.build());
+ }
- return builder;
- }
+ asyncHttpClient = nettyBuilder.build();
- private ReaderRecordProcessor createReaderRecordProcessor(final
ProcessContext context) {
- final RecordReaderFactory recordReaderFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
- final RecordSetWriterFactory recordWriterFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final KinesisAsyncClientBuilder asyncBuilder =
KinesisAsyncClient.builder()
+ .region(region)
+ .credentialsProvider(credentialsProvider)
+ .httpClient(asyncHttpClient);
- final OutputStrategy outputStrategy =
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
- final KinesisRecordConverter converter = switch (outputStrategy) {
- case USE_VALUE -> new ValueRecordConverter();
- case USE_WRAPPER -> new WrapperRecordConverter();
- case INJECT_METADATA -> new InjectMetadataRecordConverter();
- };
+ if (endpointOverride != null && !endpointOverride.isEmpty()) {
+ asyncBuilder.endpointOverride(URI.create(endpointOverride));
+ }
- return new ReaderRecordProcessor(recordReaderFactory, converter,
recordWriterFactory, getLogger());
+ final String consumerName =
context.getProperty(APPLICATION_NAME).getValue();
+ consumerClient.initialize(asyncBuilder.build(), streamName,
consumerName);
+ }
}
- private static InitialPositionInStreamExtended getInitialPosition(final
ProcessContext context) {
- final InitialPosition initialPosition =
context.getProperty(INITIAL_STREAM_POSITION).asAllowableValue(InitialPosition.class);
- return switch (initialPosition) {
- case TRIM_HORIZON ->
-
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
- case LATEST ->
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST);
- case AT_TIMESTAMP -> {
- final String timestampValue =
context.getProperty(STREAM_POSITION_TIMESTAMP).getValue();
- final Instant timestamp = Instant.parse(timestampValue);
- yield
InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(timestamp));
- }
- };
+ private static Instant resolveTimestampPosition(final ProcessContext
context) {
+ final InitialPosition position =
context.getProperty(INITIAL_STREAM_POSITION).asAllowableValue(InitialPosition.class);
+ if (position == InitialPosition.AT_TIMESTAMP) {
+ return
Instant.parse(context.getProperty(STREAM_POSITION_TIMESTAMP).getValue());
+ }
+ return null;
}
- private String generateWorkerId() {
- final String processorId = getIdentifier();
- final NodeTypeProvider nodeTypeProvider = getNodeTypeProvider();
-
- final String workerId;
+ /**
+ * Builds an {@link ApacheHttpClient} with the given connection pool size
and optional proxy
+ * configuration. Each AWS service client (Kinesis, DynamoDB) should
receive its own HTTP client
+ * so their connection pools are isolated and cannot starve each other
under high shard counts.
+ */
+ private static SdkHttpClient buildApacheHttpClient(final
ProxyConfiguration proxyConfig, final int maxConnections) {
+ final ApacheHttpClient.Builder builder = ApacheHttpClient.builder()
+ .maxConnections(maxConnections);
- if (nodeTypeProvider.isClustered()) {
- // If a node id is not available for some reason, generating a
random UUID helps to avoid collisions.
- final String nodeId =
nodeTypeProvider.getCurrentNode().orElse(UUID.randomUUID().toString());
- workerId = "%s@%s".formatted(processorId, nodeId);
- } else {
- workerId = processorId;
- }
+ if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
+ final URI proxyEndpoint = URI.create(String.format("http://%s:%s",
proxyConfig.getProxyServerHost(), proxyConfig.getProxyServerPort()));
+ final
software.amazon.awssdk.http.apache.ProxyConfiguration.Builder proxyBuilder =
+
software.amazon.awssdk.http.apache.ProxyConfiguration.builder().endpoint(proxyEndpoint);
- return workerId;
- }
+ if (proxyConfig.hasCredential()) {
+ proxyBuilder.username(proxyConfig.getProxyUserName());
+ proxyBuilder.password(proxyConfig.getProxyUserPassword());
+ }
- private static @Nullable MetricsFactory configureMetricsFactory(final
ProcessContext context) {
- final MetricsPublishing metricsPublishing =
context.getProperty(METRICS_PUBLISHING).asAllowableValue(MetricsPublishing.class);
- return switch (metricsPublishing) {
- case DISABLED -> new NullMetricsFactory();
- case LOGS -> new LogMetricsFactory();
- case CLOUDWATCH -> null; // If no metrics factory was provided,
CloudWatch metrics factory is used by default.
- };
- }
+ builder.proxyConfiguration(proxyBuilder.build());
+ }
- private static RetrievalSpecificConfig configureRetrievalSpecificConfig(
- final ProcessContext context,
- final KinesisAsyncClient kinesisClient,
- final String streamName,
- final String applicationName) {
- final ConsumerType consumerType =
context.getProperty(CONSUMER_TYPE).asAllowableValue(ConsumerType.class);
- return switch (consumerType) {
- case SHARED_THROUGHPUT -> new
PollingConfig(kinesisClient).streamName(streamName);
- case ENHANCED_FAN_OUT -> new
FanOutConfig(kinesisClient).streamName(streamName).applicationName(applicationName);
- };
+ return builder.build();
}
@OnStopped
public void onStopped() {
- cleanUpState();
+ if (shardManager != null) {
+ shardManager.releaseAllLeases();
+ shardManager.close();
+ shardManager = null;
+ }
- initialized.set(false);
- initializationResultFuture = null;
- }
+ if (consumerClient != null) {
+ consumerClient.close();
+ consumerClient = null;
+ }
- private void cleanUpState() {
- if (kinesisScheduler != null) {
- shutdownScheduler();
- kinesisScheduler = null;
+ if (asyncHttpClient != null) {
+ asyncHttpClient.close();
+ asyncHttpClient = null;
}
if (kinesisClient != null) {
kinesisClient.close();
kinesisClient = null;
}
+
if (dynamoDbClient != null) {
dynamoDbClient.close();
dynamoDbClient = null;
}
- if (cloudWatchClient != null) {
- cloudWatchClient.close();
- cloudWatchClient = null;
- }
- recordBuffer = null;
- readerRecordProcessor = null;
- demarcatorValue = null;
+ closeQuietly(kinesisHttpClient);
+ kinesisHttpClient = null;
+ closeQuietly(dynamoHttpClient);
+ dynamoHttpClient = null;
}
- private void shutdownScheduler() {
- if (kinesisScheduler.shutdownComplete()) {
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ final int clusterMemberCount = Math.max(1,
getNodeTypeProvider().getClusterMembers().size());
+ shardManager.refreshLeasesIfNecessary(clusterMemberCount);
+ final List<Shard> ownedShards = shardManager.getOwnedShards();
+
+ if (ownedShards.isEmpty()) {
+ context.yield();
return;
}
- final long start = System.nanoTime();
- getLogger().debug("Shutting down Kinesis Scheduler");
+ final Set<String> ownedShardIds = new HashSet<>();
+ for (final Shard shard : ownedShards) {
+ ownedShardIds.add(shard.shardId());
+ }
+
+ consumerClient.removeUnownedShards(ownedShardIds);
+ consumerClient.startFetches(ownedShards, streamName,
maxRecordsPerRequest, initialStreamPosition, shardManager);
+ consumerClient.logDiagnostics(ownedShards.size(),
shardManager.getCachedShardCount());
+
+ final Set<String> claimedShards = new HashSet<>();
+ final List<ShardFetchResult> consumed = consumeRecords(claimedShards);
+ final List<ShardFetchResult> accepted =
discardRelinquishedResults(consumed, claimedShards);
+
+ if (accepted.isEmpty()) {
+ consumerClient.releaseShards(claimedShards);
+ context.yield();
+ return;
+ }
- boolean gracefulShutdownSucceeded;
+ final PartitionedBatch batch = partitionByShardAndCheckpoint(accepted);
+
+ final WriteResult output;
try {
- gracefulShutdownSucceeded =
kinesisScheduler.startGracefulShutdown().get(KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT.getSeconds(),
SECONDS);
- if (!gracefulShutdownSucceeded) {
- getLogger().warn("Failed to shutdown Kinesis Scheduler
gracefully. See the logs for more details");
- }
- } catch (final RuntimeException | InterruptedException |
ExecutionException | TimeoutException e) {
- if (e instanceof TimeoutException) {
- getLogger().warn("Failed to shutdown Kinesis Scheduler
gracefully after {} seconds",
KINESIS_SCHEDULER_GRACEFUL_SHUTDOWN_TIMEOUT.getSeconds(), e);
- } else {
- getLogger().warn("Failed to shutdown Kinesis Scheduler
gracefully", e);
- }
- gracefulShutdownSucceeded = false;
+ output = writeResults(session, context, batch.resultsByShard());
+ } catch (final Exception e) {
+ handleWriteFailure(e, accepted, claimedShards, context);
+ return;
}
- if (!gracefulShutdownSucceeded) {
- kinesisScheduler.shutdown();
+ if (output.produced().isEmpty() && output.parseFailures().isEmpty()) {
+ consumerClient.releaseShards(claimedShards);
+ context.yield();
+ return;
}
- final long finish = System.nanoTime();
- getLogger().debug("Kinesis Scheduler shutdown finished after {}
seconds", NANOSECONDS.toSeconds(finish - start));
+ session.transfer(output.produced(), REL_SUCCESS);
+ if (!output.parseFailures().isEmpty()) {
+ session.transfer(output.parseFailures(), REL_PARSE_FAILURE);
+ session.adjustCounter("Records Parse Failure",
output.parseFailures().size(), false);
+ }
+ session.adjustCounter("Records Consumed", output.totalRecordCount(),
false);
+ final long dedupEvents = consumerClient.drainDeduplicatedEventCount();
+ if (dedupEvents > 0) {
+ session.adjustCounter("EFO Deduplicated Events", dedupEvents,
false);
+ }
+
+ session.commitAsync(
+ () -> {
+ try {
+ shardManager.writeCheckpoints(batch.checkpoints());
+ consumerClient.acknowledgeResults(accepted);
+ } finally {
+ consumerClient.releaseShards(claimedShards);
+ }
+ },
+ failure -> {
+ try {
+ getLogger().error("Session commit failed; resetting
shard iterators for re-consumption", failure);
+ consumerClient.rollbackResults(accepted);
+ } finally {
+ consumerClient.releaseShards(claimedShards);
+ }
+ });
}
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- if (!initialized.get()) {
- if (!initializationResultFuture.isDone()) {
- getLogger().debug("Waiting for Kinesis Scheduler to finish
initialization");
- context.yield();
- return;
+ private List<ShardFetchResult> discardRelinquishedResults(final
List<ShardFetchResult> consumedResults, final Set<String> claimedShards) {
+ final List<ShardFetchResult> accepted = new ArrayList<>();
+ final List<ShardFetchResult> discarded = new ArrayList<>();
+ for (final ShardFetchResult result : consumedResults) {
+ if (shardManager.shouldProcessFetchedResult(result.shardId())) {
+ accepted.add(result);
+ } else {
+ discarded.add(result);
}
+ }
- checkInitializationResult(initializationResultFuture.resultNow());
+ if (!discarded.isEmpty()) {
+ getLogger().debug("Discarding {} fetched shard result(s) for
relinquished shards", discarded.size());
+ consumerClient.rollbackResults(discarded);
+ for (final ShardFetchResult r : discarded) {
+ claimedShards.remove(r.shardId());
+ }
+
consumerClient.releaseShards(discarded.stream().map(ShardFetchResult::shardId).toList());
}
- final Optional<Lease> leaseAcquired =
recordBuffer.acquireBufferLease();
+ return accepted;
+ }
- leaseAcquired.ifPresentOrElse(
- lease -> processRecordsFromBuffer(session, lease),
- context::yield
- );
+ private PartitionedBatch partitionByShardAndCheckpoint(final
List<ShardFetchResult> accepted) {
+ final Map<String, List<ShardFetchResult>> resultsByShard = new
LinkedHashMap<>();
+ for (final ShardFetchResult result : accepted) {
+ resultsByShard.computeIfAbsent(result.shardId(), k -> new
ArrayList<>()).add(result);
+ }
+ for (final List<ShardFetchResult> shardResults :
resultsByShard.values()) {
+ shardResults.sort(Comparator.comparing(r -> new
BigInteger(r.firstSequenceNumber())));
+ }
+
+ final Map<String, ShardCheckpoint> checkpoints = new HashMap<>();
+ for (final ShardFetchResult result : accepted) {
+ final ShardCheckpoint incoming = new
ShardCheckpoint(result.lastSequenceNumber(), result.lastSubSequenceNumber());
+ checkpoints.merge(result.shardId(), incoming,
ShardCheckpoint::max);
+ }
+
+ return new PartitionedBatch(resultsByShard, checkpoints);
}
- private void checkInitializationResult(final InitializationResult
initializationResult) {
- switch (initializationResult) {
- case InitializationResult.Success ignored -> {
- final boolean wasInitialized = initialized.getAndSet(true);
- if (!wasInitialized) {
- getLogger().info(
- "Started Kinesis Scheduler for stream [{}] with
application name [{}] and workerId [{}]",
- streamName, kinesisScheduler.applicationName(),
kinesisScheduler.leaseManagementConfig().workerIdentifier());
+ private List<ShardFetchResult> consumeRecords(final Set<String>
claimedShards) {
+ final List<ShardFetchResult> results = new ArrayList<>();
+ final long startNanos = System.nanoTime();
+ long estimatedBytes = 0;
+
+ while (System.nanoTime() < startNanos + maxBatchNanos &&
estimatedBytes < maxBatchBytes) {
+ boolean foundAny = false;
+
+ for (final String shardId :
consumerClient.getShardIdsWithResults()) {
+ if (estimatedBytes >= maxBatchBytes) {
+ break;
+ }
+ if (!claimedShards.contains(shardId) &&
!consumerClient.claimShard(shardId)) {
+ continue;
+ }
+ claimedShards.add(shardId);
+
+ ShardFetchResult result;
+ while ((result = consumerClient.pollShardResult(shardId)) !=
null) {
+ results.add(result);
+ estimatedBytes += estimateResultBytes(result);
+ foundAny = true;
+ if (estimatedBytes >= maxBatchBytes) {
+ break;
+ }
}
}
- case InitializationResult.Failure failure -> {
- cleanUpState();
- final ProcessException ex = failure.error()
- .map(err -> new ProcessException("Initialization
failed for stream [%s]".formatted(streamName), err))
- // This branch is active only when a scheduler was
shutdown, but no initialization error was provided.
- // This behavior isn't typical and wasn't observed.
- .orElseGet(() -> new ProcessException("Initialization
failed for stream [%s]".formatted(streamName)));
+ if (!foundAny) {
+ if (!consumerClient.hasPendingFetches()) {
+ break;
+ }
- throw ex;
+ try {
+ consumerClient.awaitResults(QUEUE_POLL_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
}
}
+
+ return results;
+ }
+
+ private void handleWriteFailure(final Exception cause, final
List<ShardFetchResult> accepted,
+ final Set<String> claimedShards, final
ProcessContext context) {
+ getLogger().error("Failed to write consumed Kinesis records", cause);
+ consumerClient.rollbackResults(accepted);
+ consumerClient.releaseShards(claimedShards);
+ context.yield();
}
- private void processRecordsFromBuffer(final ProcessSession session, final
Lease lease) {
+ private WriteResult writeResults(final ProcessSession session, final
ProcessContext context,
+ final Map<String, List<ShardFetchResult>>
resultsByShard) {
+ final List<FlowFile> produced = new ArrayList<>();
+ final List<FlowFile> parseFailures = new ArrayList<>();
+ long totalRecordCount = 0;
+ long totalBytesConsumed = 0;
+ long maxMillisBehind = -1;
+
try {
- final List<KinesisClientRecord> records =
recordBuffer.consumeRecords(lease);
+ if (processingStrategy == ProcessingStrategy.FLOW_FILE) {
+ final BatchAccumulator batch = new BatchAccumulator();
+ for (final List<ShardFetchResult> shardResults :
resultsByShard.values()) {
+ for (final ShardFetchResult result : shardResults) {
+ batch.updateMillisBehind(result.millisBehindLatest());
+ for (final DeaggregatedRecord record :
result.records()) {
+ batch.addBytes(record.data().length);
+ }
+ }
+ writeFlowFilePerRecord(session, shardResults, streamName,
batch, produced);
+ }
+ totalRecordCount = batch.getRecordCount();
+ totalBytesConsumed = batch.getBytesConsumed();
+ maxMillisBehind = batch.getMaxMillisBehind();
+ } else {
+ for (final Map.Entry<String, List<ShardFetchResult>> entry :
resultsByShard.entrySet()) {
+ final BatchAccumulator batch = new BatchAccumulator();
+ batch.setLastShardId(entry.getKey());
+ for (final ShardFetchResult result : entry.getValue()) {
+ batch.updateMillisBehind(result.millisBehindLatest());
+ batch.updateSequenceRange(result);
+ for (final DeaggregatedRecord record :
result.records()) {
+ batch.addBytes(record.data().length);
+ batch.updateRecordRange(record);
+ }
+ }
- if (records.isEmpty()) {
- recordBuffer.returnBufferLease(lease);
- return;
- }
+ if (processingStrategy ==
ProcessingStrategy.LINE_DELIMITED || processingStrategy ==
ProcessingStrategy.DEMARCATOR) {
+ final byte[] delimiter;
+ if (processingStrategy ==
ProcessingStrategy.LINE_DELIMITED) {
+ delimiter = NEWLINE_DELIMITER;
+ } else {
+ final String demarcatorValue =
context.getProperty(MESSAGE_DEMARCATOR).getValue();
+ delimiter =
demarcatorValue.getBytes(StandardCharsets.UTF_8);
+ }
+ writeDelimited(session, entry.getValue(), streamName,
batch, delimiter, produced);
+ } else {
+ writeRecordOriented(session, context,
entry.getValue(), streamName, batch, produced, parseFailures);
+ }
- final String shardId = lease.shardId();
- switch (processingStrategy) {
- case FLOW_FILE -> processRecordsAsRaw(session, shardId,
records);
- case RECORD -> processRecordsWithReader(session, shardId,
records);
- case DEMARCATOR -> processRecordsAsDemarcated(session,
shardId, records);
+ totalRecordCount += batch.getRecordCount();
+ totalBytesConsumed += batch.getBytesConsumed();
+ maxMillisBehind = Math.max(maxMillisBehind,
batch.getMaxMillisBehind());
+ }
}
+ } catch (final Exception e) {
+ session.remove(produced);
+ session.remove(parseFailures);
+ throw e;
+ }
+
+ return new WriteResult(produced, parseFailures, totalRecordCount,
totalBytesConsumed, maxMillisBehind);
+ }
- session.adjustCounter("Records Processed", records.size(), false);
+ private void writeFlowFilePerRecord(final ProcessSession session, final
List<ShardFetchResult> results,
+ final String streamName, final
BatchAccumulator batch, final List<FlowFile> output) {
+ for (final ShardFetchResult result : results) {
+ for (final DeaggregatedRecord record : result.records()) {
+ final byte[] recordBytes = record.data();
+ FlowFile flowFile = session.create();
+ try {
+ flowFile = session.write(flowFile, out ->
out.write(recordBytes));
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(ATTR_STREAM_NAME, streamName);
+ attributes.put(ATTR_SHARD_ID, result.shardId());
+ attributes.put(ATTR_FIRST_SEQUENCE,
record.sequenceNumber());
+ attributes.put(ATTR_LAST_SEQUENCE,
record.sequenceNumber());
+ attributes.put(ATTR_FIRST_SUBSEQUENCE,
String.valueOf(record.subSequenceNumber()));
+ attributes.put(ATTR_LAST_SUBSEQUENCE,
String.valueOf(record.subSequenceNumber()));
+ attributes.put(ATTR_PARTITION_KEY, record.partitionKey());
+ if (record.approximateArrivalTimestamp() != null) {
+ attributes.put(ATTR_ARRIVAL_TIMESTAMP,
String.valueOf(record.approximateArrivalTimestamp().toEpochMilli()));
+ }
+ attributes.put("record.count", "1");
+ if (result.millisBehindLatest() >= 0) {
+ attributes.put(ATTR_MILLIS_BEHIND,
String.valueOf(result.millisBehindLatest()));
+ }
- session.commitAsync(
- () -> commitRecords(lease),
- __ -> rollbackRecords(lease)
- );
- } catch (final RuntimeException e) {
- rollbackRecords(lease);
- throw e;
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.getProvenanceReporter().receive(flowFile,
buildTransitUri(streamName, result.shardId()));
+ output.add(flowFile);
+ batch.incrementRecordCount();
+ } catch (final Exception e) {
+ session.remove(flowFile);
+ throw e;
+ }
+ }
}
}
- private void commitRecords(final Lease lease) {
+ private void writeDelimited(final ProcessSession session, final
List<ShardFetchResult> results,
+ final String streamName, final
BatchAccumulator batch, final byte[] delimiter,
+ final List<FlowFile> output) {
+ FlowFile flowFile = session.create();
try {
- recordBuffer.commitConsumedRecords(lease);
- } finally {
- recordBuffer.returnBufferLease(lease);
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException
{
+ boolean first = true;
+ for (final ShardFetchResult result : results) {
+ for (final DeaggregatedRecord record :
result.records()) {
+ if (!first) {
+ out.write(delimiter);
+ }
+ out.write(record.data());
+ first = false;
+ batch.incrementRecordCount();
+ }
+ }
+ }
+ });
+
+ flowFile = session.putAllAttributes(flowFile,
createFlowFileAttributes(streamName, batch));
+ session.getProvenanceReporter().receive(flowFile,
buildTransitUri(streamName, batch.getLastShardId()));
+ output.add(flowFile);
+ } catch (final Exception e) {
+ session.remove(flowFile);
+ throw e;
}
}
- private void rollbackRecords(final Lease lease) {
+ /**
+ * Writes Kinesis records as NiFi records using the configured Record
Reader and Record Writer.
+ *
+ * <p>This method may appear unnecessarily complex, but it is intended to
address specific requirements:</p>
+ * <ul>
+ * <li>Keep records ordered in the same order they are received from
Kinesis</li>
+ * <li>Create as few FlowFiles as necessary, keeping many records
together in larger FlowFiles for performance reasons.</li>
+ * </ul>
+ *
+ * <p>Alternative options have been considered, as well:</p>
+ * <ul>
+ * <li>Read each Record one at a time with a separate RecordReader. If
its schema is different than the previous
+ * record, create a new FlowFile. However, when the stream is filled
with JSON and many fields are nullable, this
+ * can look like a different schema for each Record when inference
is used, thus creating many tiny FlowFiles.</li>
+ * <li>Read each Record one at a time with a separate RecordReader. Map
the RecordSchema to the existing RecordWriter
+ * for that schema, if one exists, and write to that writer; if none
exists, create a new one. This results in better
+ * grouping in many cases, but it results in the output being
reordered, as we may write records 1, 2, 3 to writers
+ * A, B, A.</li>
+ * <li>Create a single InputStream and RecordReader for the entire
batch. Create a single Writer for the entire batch.
+ * This way, we infer a single schema for the entire batch that is
appropriate for all records. This bundles all records
+ * in the batch into a single FlowFile, which is ideal. However,
this approach fails when we are not inferring the schema
+ * and the records do not all have the same schema. In that case, we
can fail when attempting to read the records or when
+ * we attempt to write the records due to schema
incompatibility.</li>
+ * </ul>
+ *
+ * <p>
+ * Additionally, the existing RecordSchema API does not tell us whether
or not a schema was inferred,
+ * so we cannot easily make a decision based on that knowledge.
Therefore, we have taken an approach that
+ * attempts to process data using our preferred method, falling back as
necessary to other options.
+ * </p>
+ *
+ * <p>
+ * The primary path ({@link #writeRecordBatch}) combines all records
into a single InputStream
+ * via {@link KinesisRecordInputStream} and creates one RecordReader.
This is optimal for formats
+ * like JSON where the schema is inferred from the data: a single
InputStream lets the reader see
+ * all records and produce a unified schema for the writer.
+ * </p>
+ *
+ * <p>
+ * However, this approach fails when records carry incompatible embedded
schemas (e.g. Avro
+ * containers with different field sets). The single reader sees only
the first schema and cannot
+ * parse subsequent records that differ from it. When this happens, the
method falls back to
+ * {@link #writeRecordBatchPerRecord}, which processes each record
individually and splits output
+ * across multiple FlowFiles when schemas change.
+ * </p>
+ */
+ private void writeRecordOriented(final ProcessSession session, final
ProcessContext context,
+ final List<ShardFetchResult> results,
final String streamName,
+ final BatchAccumulator batch, final
List<FlowFile> output,
+ final List<FlowFile> parseFailureOutput) {
+
+ final List<DeaggregatedRecord> allRecords = new ArrayList<>();
+ for (final ShardFetchResult result : results) {
+ allRecords.addAll(result.records());
+ }
+
try {
- recordBuffer.rollbackConsumedRecords(lease);
- } finally {
- recordBuffer.returnBufferLease(lease);
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final OutputStrategy outputStrategy =
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
+ writeRecordBatch(session, readerFactory, writerFactory,
outputStrategy,
+ allRecords, streamName, batch, output);
+ } catch (final Exception e) {
+ getLogger().debug("Combined-stream record processing failed;
falling back to per-record processing", e);
+ batch.resetRecordCount();
+ final RecordBatchResult result =
writeRecordBatchPerRecord(session, context, allRecords, streamName, batch);
+ output.addAll(result.output());
+ parseFailureOutput.addAll(result.parseFailures());
}
}
- private void processRecordsAsRaw(final ProcessSession session, final
String shardId, final List<KinesisClientRecord> records) {
- for (final KinesisClientRecord record : records) {
- FlowFile flowFile = session.create();
- flowFile = session.putAllAttributes(flowFile,
ConsumeKinesisAttributes.fromKinesisRecords(streamName, shardId, record,
record));
+ private void writeRecordBatch(final ProcessSession session, final
RecordReaderFactory readerFactory,
+ final RecordSetWriterFactory writerFactory,
final OutputStrategy outputStrategy,
+ final List<DeaggregatedRecord> records,
+ final String streamName, final
BatchAccumulator batch, final List<FlowFile> output) {
- flowFile = session.write(flowFile, out -> {
- try (final WritableByteChannel channel =
Channels.newChannel(out)) {
- channel.write(record.data());
+ FlowFile flowFile = session.create();
+ try {
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException
{
+ try (final InputStream kinesisInput = new
KinesisRecordInputStream(records);
+ final RecordReader reader =
readerFactory.createRecordReader(Map.of(), kinesisInput, -1, getLogger())) {
+
+ RecordSchema writeSchema = reader.getSchema();
+ if (outputStrategy == OutputStrategy.INJECT_METADATA) {
+ final List<RecordField> fields = new
ArrayList<>(writeSchema.getFields());
+ fields.add(KinesisRecordMetadata.FIELD_METADATA);
+ writeSchema = new SimpleRecordSchema(fields);
+ } else if (outputStrategy ==
OutputStrategy.USE_WRAPPER) {
+ writeSchema = new SimpleRecordSchema(List.of(
+ KinesisRecordMetadata.FIELD_METADATA,
+ new RecordField(WRAPPER_VALUE_FIELD,
RecordFieldType.RECORD.getRecordDataType(writeSchema))));
+ }
+
+ try (final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), writeSchema, out, Map.of())) {
+ writer.beginRecordSet();
+
+ int recordIndex = 0;
+ org.apache.nifi.serialization.record.Record
nifiRecord;
+ while ((nifiRecord = reader.nextRecord()) != null)
{
+ if (recordIndex < records.size()) {
+ final DeaggregatedRecord record =
records.get(recordIndex);
+ nifiRecord = decorateRecord(nifiRecord,
record, record.shardId(), streamName, outputStrategy, writeSchema);
+ recordIndex++;
+ }
+
+ writer.write(nifiRecord);
+ batch.incrementRecordCount();
+ }
+
+ writer.finishRecordSet();
+ }
+ } catch (final MalformedRecordException |
SchemaNotFoundException e) {
+ throw new IOException(e);
+ }
}
});
- session.getProvenanceReporter().receive(flowFile,
ProvenanceTransitUriFormat.toTransitUri(streamName, shardId));
-
- session.transfer(flowFile, REL_SUCCESS);
+ flowFile = session.putAllAttributes(flowFile,
createFlowFileAttributes(streamName, batch));
+ session.getProvenanceReporter().receive(flowFile,
buildTransitUri(streamName, batch.getLastShardId()));
+ output.add(flowFile);
+ } catch (final Exception e) {
+ session.remove(flowFile);
+ throw e;
}
}
- private void processRecordsWithReader(final ProcessSession session, final
String shardId, final List<KinesisClientRecord> records) {
- final ReaderRecordProcessor recordProcessor = readerRecordProcessor;
- if (recordProcessor == null) {
- throw new IllegalStateException("RecordProcessor has not been
initialized");
- }
+ /**
+ * Fallback path that processes each Kinesis record individually,
splitting output across multiple
+ * FlowFiles when the record schema changes between consecutive records.
+ *
+ * <p>This is invoked when the combined-stream approach ({@link
#writeRecordBatch}) fails, which
+ * typically happens when the batch contains records with incompatible
embedded schemas (e.g. Avro
+ * containers whose field sets differ). Rather than grouping or buffering
records up front, this
+ * method makes a single pass: for each record it creates a RecordReader,
compares the schema to
+ * the current writer's schema, and either continues writing to the same
FlowFile or finalizes the
+ * current FlowFile and starts a new one. This preserves record ordering
without demultiplexing.</p>
+ *
+ * <p>Records that cannot be parsed (empty data, malformed content,
missing schema) are collected
+ * and routed to the parse-failure relationship at the end.</p>
+ *
+ * @param session the current process session
+ * @param context the current process context (used to resolve Record
Reader, Record Writer, and Output Strategy)
+ * @param records the Kinesis records to process, in order
+ * @param streamName the Kinesis stream name, used for FlowFile attributes
+ * @param batch accumulator for batch-level attributes and record
counting
+ * @return a {@link RecordBatchResult} containing the successfully written
FlowFiles and any parse-failure FlowFiles
+ */
+ private RecordBatchResult writeRecordBatchPerRecord(final ProcessSession
session, final ProcessContext context,
+ final
List<DeaggregatedRecord> records,
+ final String
streamName, final BatchAccumulator batch) {
- final ProcessingResult result =
recordProcessor.processRecords(session, streamName, shardId, records);
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final OutputStrategy outputStrategy =
context.getProperty(OUTPUT_STRATEGY).asAllowableValue(OutputStrategy.class);
- session.transfer(result.successFlowFiles(), REL_SUCCESS);
- session.transfer(result.parseFailureFlowFiles(), REL_PARSE_FAILURE);
- }
+ final List<FlowFile> output = new ArrayList<>();
+ final List<FlowFile> parseFailureOutput = new ArrayList<>();
+ final List<DeaggregatedRecord> unparseable = new ArrayList<>();
+ FlowFile currentFlowFile = null;
+ OutputStream currentOut = null;
+ RecordSetWriter currentWriter = null;
+ RecordSchema currentReadSchema = null;
+ RecordSchema currentWriteSchema = null;
+ int currentRecordCount = 0;
- private void processRecordsAsDemarcated(final ProcessSession session,
final String shardId, final List<KinesisClientRecord> records) {
- final byte[] demarcator = demarcatorValue;
- if (demarcator == null) {
- throw new IllegalStateException("Demarcator has not been
initialized");
- }
+ try {
+ for (final DeaggregatedRecord record : records) {
- FlowFile flowFile = session.create();
+ if (record.data().length == 0) {
+ unparseable.add(record);
+ continue;
+ }
- final Map<String, String> attributes =
ConsumeKinesisAttributes.fromKinesisRecords(streamName, shardId,
records.getFirst(), records.getLast());
- attributes.put(RECORD_COUNT, String.valueOf(records.size()));
- flowFile = session.putAllAttributes(flowFile, attributes);
+ RecordSchema readSchema = null;
+ final List<org.apache.nifi.serialization.record.Record>
parsedRecords = new ArrayList<>();
+ RecordReader reader = null;
+ try {
+ reader = readerFactory.createRecordReader(Map.of(), new
ByteArrayInputStream(record.data()), record.data().length, getLogger());
+ readSchema = reader.getSchema();
+ org.apache.nifi.serialization.record.Record nifiRecord;
+ while ((nifiRecord = reader.nextRecord()) != null) {
+ parsedRecords.add(nifiRecord);
+ }
+ } catch (final MalformedRecordException |
SchemaNotFoundException | IOException e) {
+ getLogger().debug("Kinesis record seq {} classified as
unparseable: {}",
+ record.sequenceNumber(), e.getMessage());
+ unparseable.add(record);
+ continue;
+ } finally {
+ closeQuietly(reader);
+ }
- flowFile = session.write(flowFile, out -> {
- try (final WritableByteChannel channel = Channels.newChannel(out))
{
- boolean writtenData = false;
- for (final KinesisClientRecord record : records) {
- if (writtenData) {
- out.write(demarcator);
+ if (parsedRecords.isEmpty()) {
+ unparseable.add(record);
+ continue;
+ }
+
+ if (currentWriter == null ||
!readSchema.equals(currentReadSchema)) {
+ if (currentWriter != null) {
+ currentWriter.finishRecordSet();
+ currentWriter.close();
+ currentOut.close();
+ final Map<String, String> attrs =
createFlowFileAttributes(streamName, batch);
+ attrs.put("record.count",
String.valueOf(currentRecordCount));
+ currentFlowFile =
session.putAllAttributes(currentFlowFile, attrs);
+
session.getProvenanceReporter().receive(currentFlowFile,
buildTransitUri(streamName, batch.getLastShardId()));
+ output.add(currentFlowFile);
+ currentFlowFile = null;
}
- channel.write(record.data());
- writtenData = true;
+
+ currentReadSchema = readSchema;
+ currentWriteSchema = buildWriteSchema(readSchema,
outputStrategy);
+ currentFlowFile = session.create();
+ currentOut = session.write(currentFlowFile);
+ currentWriter = writerFactory.createWriter(getLogger(),
currentWriteSchema, currentOut, Map.of());
+ currentWriter.beginRecordSet();
+ currentRecordCount = 0;
}
+
+ for (final org.apache.nifi.serialization.record.Record parsed
: parsedRecords) {
+ // TODO: We need to use decorateRecord above also.
+ final org.apache.nifi.serialization.record.Record
decorated =
+ decorateRecord(parsed, record, record.shardId(),
streamName, outputStrategy, currentWriteSchema);
+ currentWriter.write(decorated);
+ currentRecordCount++;
+ batch.incrementRecordCount();
+ }
+ }
+
+ if (currentWriter != null) {
+ currentWriter.finishRecordSet();
+ currentWriter.close();
+ currentOut.close();
+ final Map<String, String> attrs =
createFlowFileAttributes(streamName, batch);
+ attrs.put("record.count", String.valueOf(currentRecordCount));
+ currentFlowFile = session.putAllAttributes(currentFlowFile,
attrs);
+ session.getProvenanceReporter().receive(currentFlowFile,
buildTransitUri(streamName, batch.getLastShardId()));
+ output.add(currentFlowFile);
+ currentFlowFile = null;
+ }
+ } catch (final Exception e) {
+ closeQuietly(currentWriter);
+ closeQuietly(currentOut);
+ if (currentFlowFile != null) {
+ session.remove(currentFlowFile);
+ }
+ if (e instanceof RuntimeException re) {
+ throw re;
}
- });
+ throw new ProcessException(e);
+ }
- session.getProvenanceReporter().receive(flowFile,
ProvenanceTransitUriFormat.toTransitUri(streamName, shardId));
+ if (!unparseable.isEmpty()) {
+ getLogger().warn("Encountered {} unparseable record(s) in shard
{}; routing to parse failure",
+ unparseable.size(), batch.getLastShardId());
+ writeParseFailures(session, unparseable, streamName, batch,
parseFailureOutput);
+ }
- session.transfer(flowFile, REL_SUCCESS);
+ return new RecordBatchResult(output, parseFailureOutput);
}
/**
- * An adapter between Kinesis Consumer Library and {@link RecordBuffer}.
+ * Adjusts a read schema to the write schema required by the configured
OutputStrategy. For
+ * {@code INJECT_METADATA} the metadata field is appended; for {@code
USE_WRAPPER} a two-field
+ * wrapper schema is created; for {@code USE_VALUE} the read schema is
returned unchanged.
*/
- private static class ConsumeKinesisRecordProcessor implements
ShardRecordProcessor {
+ private static RecordSchema buildWriteSchema(final RecordSchema
readSchema, final OutputStrategy outputStrategy) {
+ return switch (outputStrategy) {
+ case INJECT_METADATA -> {
+ final List<RecordField> fields = new
ArrayList<>(readSchema.getFields());
+ fields.add(KinesisRecordMetadata.FIELD_METADATA);
+ yield new SimpleRecordSchema(fields);
+ }
+ case USE_WRAPPER -> {
+ yield new SimpleRecordSchema(List.of(
+ KinesisRecordMetadata.FIELD_METADATA,
+ new RecordField(WRAPPER_VALUE_FIELD,
RecordFieldType.RECORD.getRecordDataType(readSchema))));
+ }
+ case USE_VALUE -> readSchema;
+ };
+ }
+
+ /**
+ * Attaches Kinesis metadata to a NiFi record according to the configured
OutputStrategy.
+ */
+ private static org.apache.nifi.serialization.record.Record decorateRecord(
+ final org.apache.nifi.serialization.record.Record nifiRecord,
+ final DeaggregatedRecord kinesisRecord, final String shardId,
+ final String streamName, final OutputStrategy outputStrategy,
+ final RecordSchema writeSchema) {
+ return switch (outputStrategy) {
+ case INJECT_METADATA -> {
+ final Map<String, Object> values = new
HashMap<>(nifiRecord.toMap());
+ values.put(KinesisRecordMetadata.METADATA,
+
KinesisRecordMetadata.composeMetadataObject(kinesisRecord, streamName,
shardId));
+ yield new MapRecord(writeSchema, values);
+ }
+ case USE_WRAPPER -> {
+ final Map<String, Object> wrapperValues = new HashMap<>(2,
1.0f);
+ wrapperValues.put(KinesisRecordMetadata.METADATA,
+
KinesisRecordMetadata.composeMetadataObject(kinesisRecord, streamName,
shardId));
+ wrapperValues.put(WRAPPER_VALUE_FIELD, nifiRecord);
+ yield new MapRecord(writeSchema, wrapperValues);
+ }
+ case USE_VALUE -> nifiRecord;
+ };
+ }
- private final RecordBuffer.ForKinesisClientLibrary recordBuffer;
- private volatile @Nullable ShardBufferId bufferId;
+ private void writeParseFailures(final ProcessSession session, final
List<DeaggregatedRecord> unparseable,
+ final String streamName, final
BatchAccumulator batch, final List<FlowFile> parseFailureOutput) {
- ConsumeKinesisRecordProcessor(final MemoryBoundRecordBuffer
recordBuffer) {
- this.recordBuffer = recordBuffer;
+ for (final DeaggregatedRecord record : unparseable) {
+ FlowFile flowFile = session.create();
+ try {
+ final byte[] rawBytes = record.data();
+ flowFile = session.write(flowFile, out -> out.write(rawBytes));
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(ATTR_STREAM_NAME, streamName);
+ attributes.put(ATTR_FIRST_SEQUENCE, record.sequenceNumber());
+ attributes.put(ATTR_LAST_SEQUENCE, record.sequenceNumber());
+ attributes.put(ATTR_FIRST_SUBSEQUENCE,
String.valueOf(record.subSequenceNumber()));
+ attributes.put(ATTR_LAST_SUBSEQUENCE,
String.valueOf(record.subSequenceNumber()));
+ attributes.put(ATTR_PARTITION_KEY, record.partitionKey());
+ if (record.approximateArrivalTimestamp() != null) {
+ attributes.put(ATTR_ARRIVAL_TIMESTAMP,
String.valueOf(record.approximateArrivalTimestamp().toEpochMilli()));
+ }
+ attributes.put("record.count", "1");
+ if (batch.getLastShardId() != null) {
+ attributes.put(ATTR_SHARD_ID, batch.getLastShardId());
+ }
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ parseFailureOutput.add(flowFile);
+ } catch (final Exception e) {
+ session.remove(flowFile);
+ throw e;
+ }
}
+ }
- @Override
- public void initialize(final InitializationInput initializationInput) {
- bufferId =
recordBuffer.createBuffer(initializationInput.shardId());
+ private static long estimateResultBytes(final ShardFetchResult result) {
+ long bytes = 0;
+ for (final DeaggregatedRecord record : result.records()) {
+ bytes += record.data().length;
+ }
+ return bytes;
+ }
+
+ private static Map<String, String> createFlowFileAttributes(final String
streamName, final BatchAccumulator batch) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(ATTR_STREAM_NAME, streamName);
+ attributes.put("record.count", String.valueOf(batch.getRecordCount()));
+
+ if (batch.getMaxMillisBehind() >= 0) {
+ attributes.put(ATTR_MILLIS_BEHIND,
String.valueOf(batch.getMaxMillisBehind()));
+ }
+ if (batch.getLastShardId() != null) {
+ attributes.put(ATTR_SHARD_ID, batch.getLastShardId());
+ }
+ if (batch.getMinSequenceNumber() != null) {
+ attributes.put(ATTR_FIRST_SEQUENCE, batch.getMinSequenceNumber());
+ }
+ if (batch.getMaxSequenceNumber() != null) {
+ attributes.put(ATTR_LAST_SEQUENCE, batch.getMaxSequenceNumber());
+ }
+ if (batch.getMinSubSequenceNumber() != Long.MAX_VALUE) {
+ attributes.put(ATTR_FIRST_SUBSEQUENCE,
String.valueOf(batch.getMinSubSequenceNumber()));
+ }
+ if (batch.getMaxSubSequenceNumber() != Long.MIN_VALUE) {
+ attributes.put(ATTR_LAST_SUBSEQUENCE,
String.valueOf(batch.getMaxSubSequenceNumber()));
+ }
+ if (batch.getLastPartitionKey() != null) {
+ attributes.put(ATTR_PARTITION_KEY, batch.getLastPartitionKey());
+ }
+ if (batch.getEarliestArrivalTimestamp() != null) {
+ attributes.put(ATTR_ARRIVAL_TIMESTAMP,
String.valueOf(batch.getEarliestArrivalTimestamp().toEpochMilli()));
+ }
+
+ return attributes;
+ }
+
+ private static void closeQuietly(final AutoCloseable closeable) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (final Exception ignored) {
+ }
+ }
+ }
+
+ private static String buildTransitUri(final String streamName, final
String shardId) {
+ if (shardId == null || shardId.isEmpty()) {
+ return "kinesis://" + streamName;
+ }
+ return "kinesis://" + streamName + "/" + shardId;
+ }
+
+ // Exposed for testing to allow injection of mock Shard Manager
+ protected KinesisShardManager createShardManager(final KinesisClient
kinesisClient, final DynamoDbClient dynamoDbClient,
+ final ComponentLog logger, final String checkpointTableName, final
String streamName) {
+ return new KinesisShardManager(kinesisClient, dynamoDbClient, logger,
checkpointTableName, streamName);
+ }
+
+ // Exposed for testing to allow injection of a mock client
+ protected KinesisConsumerClient createConsumerClient(final KinesisClient
kinesisClient, final ComponentLog logger, final boolean efoMode) {
+ if (efoMode) {
+ return new EfoKinesisClient(kinesisClient, logger);
+ }
+ return new PollingKinesisClient(kinesisClient, logger);
+ }
+
+ private record RecordBatchResult(List<FlowFile> output, List<FlowFile>
parseFailures) {
+ }
+
+ private static final class KinesisRecordInputStream extends InputStream {
+ private final List<byte[]> chunks;
+ private int chunkIndex;
+ private int positionInChunk;
+ private int markChunkIndex = -1;
+ private int markPositionInChunk;
+
+ KinesisRecordInputStream(final List<DeaggregatedRecord> records) {
+ this.chunks = new ArrayList<>(records.size());
+ for (final DeaggregatedRecord record : records) {
+ final byte[] data = record.data();
+ if (data.length > 0) {
+ chunks.add(data);
+ }
+ }
}
@Override
- public void processRecords(final ProcessRecordsInput
processRecordsInput) {
- if (bufferId == null) {
- throw new IllegalStateException("Buffer ID not found: Record
Processor not initialized");
+ public int read() {
+ while (chunkIndex < chunks.size()) {
+ final byte[] current = chunks.get(chunkIndex);
+ if (positionInChunk < current.length) {
+ return current[positionInChunk++] & 0xFF;
+ }
+ chunkIndex++;
+ positionInChunk = 0;
}
- recordBuffer.addRecords(bufferId, processRecordsInput.records(),
processRecordsInput.checkpointer());
+ return -1;
}
@Override
- public void leaseLost(final LeaseLostInput leaseLostInput) {
- if (bufferId != null) {
- recordBuffer.consumerLeaseLost(bufferId);
+ public int read(final byte[] buffer, final int offset, final int
length) {
+ if (chunkIndex >= chunks.size()) {
+ return -1;
+ }
+ if (length == 0) {
+ return 0;
+ }
+
+ int totalRead = 0;
+ while (totalRead < length && chunkIndex < chunks.size()) {
+ final byte[] current = chunks.get(chunkIndex);
+ final int remaining = current.length - positionInChunk;
+ if (remaining <= 0) {
+ chunkIndex++;
+ positionInChunk = 0;
+ continue;
+ }
+
+ final int toRead = Math.min(length - totalRead, remaining);
+ System.arraycopy(current, positionInChunk, buffer, offset +
totalRead, toRead);
+ positionInChunk += toRead;
+ totalRead += toRead;
}
+
+ return totalRead == 0 ? -1 : totalRead;
}
@Override
- public void shardEnded(final ShardEndedInput shardEndedInput) {
- if (bufferId != null) {
- recordBuffer.checkpointEndedShard(bufferId,
shardEndedInput.checkpointer());
+ public int available() {
+ if (chunkIndex >= chunks.size()) {
+ return 0;
}
+ return chunks.get(chunkIndex).length - positionInChunk;
Review Comment:
You are correct that we could. However, I don't believe there's any benefit
to it. So I wouldn't add any complexity, even trivial, if there's no benefit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]