SAitharaju opened a new pull request, #138:
URL: https://github.com/apache/flink-connector-elasticsearch/pull/138
### Add Configurable Retry & Emergency Error Handling to Elasticsearch 8 Sink
#### Description
This PR enhances the reliability of the Elasticsearch 8 sink by introducing
a configurable retry mechanism for failed requests. It addresses the issue
where transient errors (like network blips) or "poison pill" records could
cause immediate job failures.
The new implementation offers two modes of operation:
#### Non-Emergency Mode (Default):
The sink retries a failed request up to a configurable maxRetries.
If the request fails continuously and exceeds the limit, the sink throws a
FlinkRuntimeException, causing the Flink job to restart.
**Use Case:** Critical data pipelines where data loss is unacceptable and
operator intervention is preferred over skipping data.
#### Emergency Mode (Optional):
The sink retries a failed request up to maxRetries.
If the limit is reached, the failed record is dropped (skipped), and
processing continues.
A metric numRecordsSkipped is incremented to track data loss.
**Use Case:** 24/7 streaming applications where availability is prioritized
over perfect data completeness, preventing a single bad record from halting the
pipeline.
#### Key Changes
1. Core Logic (Elasticsearch8AsyncWriter)
**Retry Tracking:** Wrapped standard operations in a RetryableOperation
class to maintain an attemptCount for each record.
**Retry Logic:** Overrode submitRequestEntries to intercept failed batch
responses. Instead of failing immediately or retrying infinitely, it now checks
attemptCount against maxRetries.
**Mode Handling:** Implemented the conditional logic:
**Emergency Enabled:** Log warning, increment numRecordsSkipped, and mark
request as complete (drop).
**Emergency Disabled:** Throw exception to fail the job.
**Fatal Errors:** Removed ConnectException from the fatal error classifier.
Connection refusals are now treated as retryable, preventing immediate crashes
during brief network outages.
2. Configuration (Elasticsearch8AsyncSinkBuilder)
**Added .setMaxRetries(int):** Configures how many times to retry a failed
request (default: 5).
**Added .setEmergencyMode(boolean):** Toggles the "drop on failure" behavior
(default: false).
3. State Persistence (Elasticsearch8AsyncSinkSerializer)
Implemented a custom serializer to persist RetryableOperation state during
Flink checkpoints.
**Crucial Fix:** The attemptCount is saved and restored. This prevents a
"poison pill" record from resetting its counter to 0 upon job restart, which
would otherwise lead to an infinite crash-restart loop.
4. Integration Tests (Elasticsearch8AsyncWriterITCase)
**Robust Setup:** Updated ElasticsearchSinkBaseITCase to use
HttpWaitStrategy and a centralized waitForElasticsearch check. This fixes flaky
tests caused by Docker containers being "running" but not yet "ready" (socket
connection refused).
#### New Tests:
**testEmergencyModeDropsRecords:** Verifies that after N retries, the record
is dropped, the job continues, and the skipped metric increases.
**testNonEmergencyModeFailsJob:** Verifies that the job correctly fails
after N retries in default mode.
**testHandlePartiallyFailedBulk:** Ensures partial batch failures trigger
the retry mechanism correctly without hanging.
### How to Use
#### Default Behavior (Safe Mode):
```
Elasticsearch8AsyncSinkBuilder<String> builder = new
Elasticsearch8AsyncSinkBuilder<>()
.setHosts(...)
.setMaxRetries(10) // Optional: Increase retry attempts
.build();
// If a record fails 10 times, the job will fail.
Emergency Mode (Availability Mode):
```
```
Elasticsearch8AsyncSinkBuilder<String> builder = new
Elasticsearch8AsyncSinkBuilder<>()
.setHosts(...)
.setMaxRetries(5)
.setEmergencyMode(true) // Enable dropping of bad records
.build();
// If a record fails 5 times, it is dropped and logged. The job continues.
```
### Verification
[x] mvn clean test -pl flink-connector-elasticsearch8 -am passed.
[x] Verified Docker integration tests pass consistently with new wait
strategy.
[x] Code formatting compliant with project standards (mvn spotless:apply).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]