SAitharaju opened a new pull request, #139:
URL: https://github.com/apache/flink-connector-elasticsearch/pull/139

   ### Add Configurable Retry & Emergency Error Handling to Elasticsearch 8 Sink
   
   #### Description
   
   This PR enhances the reliability of the Elasticsearch 8 sink by introducing 
a configurable retry mechanism for failed requests. It addresses the issue 
where transient errors (like network blips) or "poison pill" records could 
cause immediate job failures.
   
   The new implementation offers two modes of operation:
   
   #### Non-Emergency Mode (Default):
   
   The sink retries a failed request up to a configurable maxRetries.
   
   If the request fails continuously and exceeds the limit, the sink throws a 
FlinkRuntimeException, causing the Flink job to restart.
   
   **Use Case:** Critical data pipelines where data loss is unacceptable and 
operator intervention is preferred over skipping data.
   
   #### Emergency Mode (Optional):
   
   The sink retries a failed request up to maxRetries.
   
   If the limit is reached, the failed record is dropped (skipped), and 
processing continues.
   
   A metric numRecordsSkipped is incremented to track data loss.
   
   **Use Case:** 24/7 streaming applications where availability is prioritized 
over perfect data completeness, preventing a single bad record from halting the 
pipeline.
   
   #### Key Changes
   1. Core Logic (Elasticsearch8AsyncWriter)
   
   **Retry Tracking:** Wrapped standard operations in a RetryableOperation 
class to maintain an attemptCount for each record.
   
   **Retry Logic:** Overrode submitRequestEntries to intercept failed batch 
responses. Instead of failing immediately or retrying infinitely, it now checks 
attemptCount against maxRetries.
   
   **Mode Handling:** Implemented the conditional logic:
   
   **Emergency Enabled:** Log warning, increment numRecordsSkipped, and mark 
request as complete (drop).
   
   **Emergency Disabled:** Throw exception to fail the job.
   
   **Fatal Errors:** Removed ConnectException from the fatal error classifier. 
Connection refusals are now treated as retryable, preventing immediate crashes 
during brief network outages.
   
   2. Configuration (Elasticsearch8AsyncSinkBuilder)
   
   **Added .setMaxRetries(int):** Configures how many times to retry a failed 
request (default: 5).
   
   **Added .setEmergencyMode(boolean):** Toggles the "drop on failure" behavior 
(default: false).
   
   3. State Persistence (Elasticsearch8AsyncSinkSerializer)
   
   Implemented a custom serializer to persist RetryableOperation state during 
Flink checkpoints.
   
   **Crucial Fix:** The attemptCount is saved and restored. This prevents a 
"poison pill" record from resetting its counter to 0 upon job restart, which 
would otherwise lead to an infinite crash-restart loop.
   
   4. Integration Tests (Elasticsearch8AsyncWriterITCase)
   
   **Robust Setup:** Updated ElasticsearchSinkBaseITCase to use 
HttpWaitStrategy and a centralized waitForElasticsearch check. This fixes flaky 
tests caused by Docker containers being "running" but not yet "ready" (socket 
connection refused).
   
   #### New Tests:
   
   **testEmergencyModeDropsRecords:** Verifies that after N retries, the record 
is dropped, the job continues, and the skipped metric increases.
   
   **testNonEmergencyModeFailsJob:** Verifies that the job correctly fails 
after N retries in default mode.
   
   **testHandlePartiallyFailedBulk:** Ensures partial batch failures trigger 
the retry mechanism correctly without hanging.
   
   ### How to Use
   #### Default Behavior (Safe Mode):
   
   
   ```
   
   Elasticsearch8AsyncSinkBuilder<String> builder = new 
Elasticsearch8AsyncSinkBuilder<>()
       .setHosts(...)
       .setMaxRetries(10) // Optional: Increase retry attempts
       .build();
   // If a record fails 10 times, the job will fail.
   Emergency Mode (Availability Mode):
   
   ```
   
   
   ```
   Elasticsearch8AsyncSinkBuilder<String> builder = new 
Elasticsearch8AsyncSinkBuilder<>()
       .setHosts(...)
       .setMaxRetries(5)
       .setEmergencyMode(true) // Enable dropping of bad records
       .build();
   // If a record fails 5 times, it is dropped and logged. The job continues.
   ```
   
   ### Verification
   [x] mvn clean test -pl flink-connector-elasticsearch8 -am passed.
   
   [x] Verified Docker integration tests pass consistently with new wait 
strategy.
   
   [x] Code formatting compliant with project standards (mvn spotless:apply).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to