aws-nageshvh commented on code in PR #201: URL: https://github.com/apache/flink-connector-aws/pull/201#discussion_r2274713066
########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java: ########## @@ -244,34 +314,121 @@ private void handleFullyFailedRequest( @Override public void close() { - AWSGeneralUtil.closeResources(httpClient, kinesisClient); + try { + kinesisClientProvider.close(); + } catch (IOException e) { + throw new RuntimeException("Failed to close the kinesisClientProvider", e); + } } private void handlePartiallyFailedRequest( PutRecordsResponse response, List<PutRecordsRequestEntry> requestEntries, Consumer<List<PutRecordsRequestEntry>> requestResult) { - LOG.warn( - "KDS Sink failed to write and will retry {} entries to KDS", - response.failedRecordCount()); - numRecordsOutErrorsCounter.inc(response.failedRecordCount()); + int failedRecordCount = response.failedRecordCount(); + LOG.warn("KDS Sink failed to write and will retry {} entries to KDS", failedRecordCount); + numRecordsOutErrorsCounter.inc(failedRecordCount); if (failOnError) { getFatalExceptionCons() .accept(new KinesisStreamsException.KinesisStreamsFailFastException()); return; } - List<PutRecordsRequestEntry> failedRequestEntries = - new ArrayList<>(response.failedRecordCount()); + + List<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(failedRecordCount); List<PutRecordsResultEntry> records = response.records(); + // Collect error information and build the list of failed entries + Map<String, ErrorSummary> errorSummaries = + collectErrorSummaries(records, requestEntries, failedRequestEntries); + + // Log aggregated error information + logErrorSummaries(errorSummaries); + + requestResult.accept(failedRequestEntries); + } + + /** + * Collect error summaries from failed records and build a list of failed request entries. + * + * @param records The result entries from the Kinesis response + * @param requestEntries The original request entries + * @param failedRequestEntries List to populate with failed entries (modified as a side effect) + * @return A map of error codes to their summaries + */ + private Map<String, ErrorSummary> collectErrorSummaries( + List<PutRecordsResultEntry> records, + List<PutRecordsRequestEntry> requestEntries, + List<PutRecordsRequestEntry> failedRequestEntries) { + + // We capture error info while minimizing logging overhead in the data path, + // which is critical for maintaining throughput performance + Map<String, ErrorSummary> errorSummaries = new HashMap<>(); + for (int i = 0; i < records.size(); i++) { - if (records.get(i).errorCode() != null) { + PutRecordsResultEntry resultEntry = records.get(i); + String errorCode = resultEntry.errorCode(); + + if (errorCode != null) { + // Track the frequency of each error code to identify patterns + ErrorSummary summary = + errorSummaries.computeIfAbsent( + errorCode, code -> new ErrorSummary(resultEntry.errorMessage())); + summary.incrementCount(); + failedRequestEntries.add(requestEntries.get(i)); } } - requestResult.accept(failedRequestEntries); + return errorSummaries; + } + + /** + * Log aggregated error information at WARN level. + * + * @param errorSummaries Map of error codes to their summaries + */ + private void logErrorSummaries(Map<String, ErrorSummary> errorSummaries) { + // We log aggregated error information at WARN level to ensure visibility in production + // while avoiding the performance impact of logging each individual failure + if (!errorSummaries.isEmpty()) { + StringBuilder errorSummary = new StringBuilder("Kinesis errors summary: "); + errorSummaries.forEach( + (code, summary) -> + errorSummary.append( + String.format( + "[%s: %d records, example: %s] ", + code, + summary.getCount(), + summary.getExampleMessage()))); + + // Using a single WARN log with aggregated information provides operational + // visibility into errors without flooding logs in high-throughput scenarios + LOG.warn("KDS Sink failed to write, " + errorSummary.toString()); Review Comment: Intentionally didn't change it and kept the same since it was being used in multiple places. I don't think we need the full class name since it would show up in Logger anyway. I have expanded KDS into Kinesis Data Stream so it's more readable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org