featzhang created FLINK-39176:
---------------------------------
Summary: Introduce Node Health Management & Quarantine Framework
for ResourceManager
Key: FLINK-39176
URL: https://issues.apache.org/jira/browse/FLINK-39176
Project: Flink
Issue Type: New Feature
Components: Runtime / Configuration, Runtime / Coordination
Reporter: featzhang
What is the purpose of the change
Currently, Apache Flink lacks a comprehensive, cluster-level node health
management mechanism. While FLIP-224 introduces a blocklist for speculative
execution in batch jobs, it has several limitations:
* Not applicable to streaming jobs
* Not a general-purpose faulty node governance mechanism
* No complete Web UI integration
* No full recovery support after RM/JM failover
* Not a cluster-level resource scheduling quarantine mechanism
This improvement introduces a Node Health Management & Quarantine Framework at
the ResourceManager level to provide manual node quarantine capabilities with
automatic slot filtering.
Brief change log
* Introduce NodeHealthManager abstraction with pluggable implementations
* Integrate with SlotManager for filtering quarantined nodes during slot
allocation
* Add REST API endpoints for manual node quarantine management
{code:java}
(POST/GET/DELETE /cluster/nodes/{nodeId}/quarantine)
{code}
* Add configuration options (cluster.node-health.enabled,
cluster.node-health.default-duration)
* Add cleanup scheduler for expired quarantine entries
* Comprehensive unit and integration tests
Verifying this change
This change can be verified by:
* Unit tests for NodeHealthManager implementations
* Integration test NodeQuarantineSlotFilteringITCase verifying end-to-end slot
filtering
* Manual testing of REST API endpoints
* Configuration validation tests
Does this pull request potentially affect one of the following parts:
* Dependencies (does it add or upgrade a dependency): No
* The public API, i.e., is any changed class annotated with @Public(Evolving):
No
* The serializers: No
* The runtime per-record code paths: No
* Anything that affects deployment or recovery: Yes (ResourceManager level
changes)
* The S3 file system connector: No
Documentation
* Does this pull request introduce a new feature? Yes
* If yes, how is the feature documented? JavaDocs, Configuration docs
Implementation Details
Architecture:
{code}
NodeHealthManager (Interface)
├── DefaultNodeHealthManager (ConcurrentHashMap-based)
└── NoOpNodeHealthManager (Disabled state)
Integration Point:
SlotManager.allocateSlots() → filter quarantined nodes
{code}
Key Classes:
* org.apache.flink.runtime.resourcemanager.health.NodeHealthManager
* org.apache.flink.runtime.resourcemanager.health.NodeHealthStatus
* org.apache.flink.runtime.rest.handler.cluster.NodeQuarantineHandler
Configuration Options:
{code}
cluster.node-health.enabled: false (default)
cluster.node-health.default-duration: 10min
cluster.node-health.max-entries: 1000
{code}
REST API Endpoints:
{code}
POST /cluster/nodes/{nodeId}/quarantine
GET /cluster/nodes/quarantine
DELETE /cluster/nodes/{nodeId}/quarantine
{code}
Acceptance Criteria:
* NodeHealthManager interface and implementations
* Integration with SlotManagerImpl for slot filtering
* REST API handlers with proper request/response bodies
* Configuration options with backward compatibility (disabled by default)
* Unit tests achieving >90% code coverage
* Integration test verifying end-to-end functionality
* Documentation updates for new configuration options
* No performance impact when feature is disabled
Related Issues:
* FLIP-224: Speculative Execution for Batch Jobs
* Future work: Automatic failure detection and health scoring
--
This message was sent by Atlassian Jira
(v8.20.10#820010)