GitHub user GreatEugenius edited a discussion: Flink Multi-Version Support
## Background and Goals
Currently, Flink Agents depends on Flink version 1.20.3, while Flink has
entered the 2.x era with many new features and improvements. To enable users to
leverage more Flink features in their Flink Agents jobs, we plan to support
multiple Flink versions, including both Flink 1.20.x and 2.x series, allowing
users to choose the appropriate Flink Agents version based on their actual
Flink cluster version.
## Multi-Version Support Strategy
### Java Architecture Design
Flink Agents adopts a layered architecture design, implementing multi-version
support through the `dist` module:
```plaintext
flink-agents
├── api/ # Unified API layer
├── plan/ # Execution plan module
├── runtime/ # Runtime module
├── dist/ # Version compatibility and packaging module
(unified)
│ ├── flink-1.20/ # Flink 1.20.x
│ │ ├── src/ # Version-specific code
│ │ └── pom.xml # Complete configuration
│ ├── flink-2.0/ # Flink 2.0.x
│ │ ├── src/ # Version-specific code
│ │ └── pom.xml # Complete configuration
│ ├── flink-2.1/ # Flink 2.1.x
│ │ ├── src/ # Version-specific code
│ │ └── pom.xml # Complete configuration
│ └── flink-2.2/ # Flink 2.2.x (default)
│ └── pom.xml # Complete configuration
├── integrations/ # Integration modules
├── examples/ # Example code
└── python/ # Python Implementation
```
#### Module Responsibilities
##### API Module
* Provides unified API interfaces
* Defines core abstractions and interface specifications
* **Developed based on Flink 2.2** as the project's default version
##### Plan Module
* Responsible for generating and optimizing execution plans
* Provides unified plan interfaces
* **Developed based on Flink 2.2** as the project's default version
##### Runtime Module
* Implements core runtime logic
* Provides basic runtime capabilities
* **Developed based on Flink 2.2** as the project's default version
##### Dist Module
**Responsibilities:**
* **Unified responsibility for version compatibility implementation and final
release package building**
* Encapsulates Flink version-specific implementations (through `src`
directory)
* Handles API differences between versions
* Rewrites or extends version-related core classes (e.g.,
`ActionExecutionOperator`)
* Generates standalone uber jars using Maven Shade plugin
**Implementation Approach:**
* Each submodule (e.g., `flink-1.20`) contains:
* `src/` directory: stores version-specific compatibility code
* `pom.xml`: configures dependencies and packaging rules
* Depends on core modules like `api`, `plan`, `runtime`
* Depends on corresponding Flink framework version (scope as `provided`)
* Excludes conflicting classes from `runtime` using Maven Shade plugin, using
its own version-specific implementation
* Unpacks and reuses test classes from core modules using
`maven-dependency-plugin`
* Rewrites necessary classes and methods for version differences
**Packaging Strategy:**
* **Non-default versions** (e.g., `flink-1.20`):
* Includes version-specific code from local `src` directory
* Depends on core modules `api`, `plan`, `runtime`
* Depends on corresponding Flink framework version
* **Default version** (`flink-2.2`): directly depends on core modules like
`api`, `plan`, `runtime`
* Includes all necessary Integration modules (e.g., `ollama`)
* Generates deployable complete jar packages
* Naming format: `flink-agents-dist-flink-{version}-{project.version}.jar`
#### Integrations Module
* Provides integration capabilities with external systems
* Includes Chat Models (Ollama, etc.), Embedding Models, Vector Stores
* Flink version-independent, can be used by all Flink versions
### Python Side
Python is an interpreted language that resolves compatibility issues through
runtime Flink version detection. Python packages embed corresponding version
Java jar packages and automatically select the appropriate version at runtime.
#### Version Management Mechanism
We introduce the Flink version management utility class `FlinkVersionManager`
to provide version detection functionality, allowing developers to write
different code based on Flink version.
```python
from typing import Optional
from packaging import version
class FlinkVersionManager:
def __init__(self):
self._flink_version = None
self._initialized = False
def _initialize(self):
if self._initialized:
return
# Try multiple ways to get version
self._flink_version = self._get_java_flink_version() or
self._get_pyflink_version()
self._initialized = True
def _get_pyflink_version(self) -> Optional[str]:
try:
import pkg_resources
return pkg_resources.get_distribution("apache-flink").version
except Exception:
return None
@property
def version(self) -> Optional[str]:
self._initialize()
return self._flink_version
@property
def major_version(self) -> str | None:
if not self.version:
return None
version_parts = self.version.split('-')[0].split('.')
if len(version_parts) >= 2:
return f"{version_parts[0]}.{version_parts[1]}"
return self.version
def ge(self, target_version: str) -> bool:
"""Greater than or equal"""
if not self.version:
return False
current = self._normalize_version(self.version)
target = self._normalize_version(target_version)
return version.parse(current) >= version.parse(target)
def lt(self, target_version: str) -> bool:
"""Less than"""
return not self.ge(target_version)
def _normalize_version(self, version_str: str) -> str:
"""Normalize version string"""
# Handle various version formats
parts = version_str.split('-')[0].split('.')
if len(parts) == 2:
parts.append('0')
return '.'.join(parts[:3])
# Global instance
flink_version_manager = FlinkVersionManager()
```
**Usage Example:**
```python
from flink_agents.version import flink_version_manager
if flink_version_manager.ge("2.0.0"):
# Processing logic for Flink 2.0+ version
result = call_new_version_api()
else:
# Processing logic for versions before Flink 2.0
result = call_old_version_api()
```
#### Automatic Java Jar Loading
Python code automatically detects the Flink version at runtime and selects the
corresponding jar package from the `flink_agents/lib/` directory to load.
```python
@staticmethod
def get_execution_environment(
env: StreamExecutionEnvironment | None = None,
t_env: StreamTableEnvironment | None = None,
**kwargs: Dict[str, Any],
) -> "AgentsExecutionEnvironment":
if env is None:
return importlib.import_module(
"flink_agents.runtime.local_execution_environment"
).create_instance(env=env, t_env=t_env, **kwargs)
else:
major_version = flink_version_manager.major_version
if major_version:
# Determine the version-specific lib directory
version_dir = f"flink-{major_version}"
lib_base = files("flink_agents.lib")
version_lib = lib_base / version_dir
# Check if version-specific directory exists
if version_lib.is_dir():
for jar_file in version_lib.iterdir():
if jar_file.is_file() and str(jar_file).endswith('.jar'):
env.add_jars(f"file://{jar_file}")
else:
err_msg = f"Flink Agents dist JAR for Flink {major_version} not
found."
raise FileNotFoundError(err_msg)
return importlib.import_module(
"flink_agents.runtime.remote_execution_environment"
).create_instance(env=env, t_env=t_env, **kwargs)
else:
err_msg = "Apache Flink is not installed."
raise ModuleNotFoundError(err_msg)
```
#### Design Rationale
Currently, Flink Agents adopts a simplified version management approach on the
Python side. This is because:
1. **Actual Requirements Assessment**: Currently, there are no obvious
conflicts or incompatibility issues at the Python API layer across different
Flink versions
2. **API Reusability**: Most Python APIs can be reused across different Flink
versions without special handling for each version
3. **Avoid Over-engineering**: Implementing very specific and complex version
control mechanisms now might lead to:
* Designed features mismatching actual needs
* Unnecessary maintenance costs
* Need for redesign when future requirements change
**Future Evolution:**
We will consider introducing more complex version control mechanisms when:
* Python APIs show obvious incompatibilities across different Flink versions
* More fine-grained version control is needed
* New Flink versions introduce breaking changes
## Build Artifacts
### Java Artifacts
Generated jar package naming format:
`flink-agents-dist-flink-{version}-{project.version}.jar`
**Currently Supported Versions:**
```plaintext
dist/
├── flink-1.20/target/
│ └── flink-agents-dist-flink-1.20-0.2-SNAPSHOT.jar # Flink 1.20.x version
├── flink-2.0/target/
│ └── flink-agents-dist-flink-2.0-0.2-SNAPSHOT.jar # Flink 2.0.x version
├── flink-2.1/target/
│ └── flink-agents-dist-flink-2.1-0.2-SNAPSHOT.jar # Flink 2.1.x version
└── flink-2.2/target/
└── flink-agents-dist-flink-2.2-0.2-SNAPSHOT.jar # Flink 2.2.x version
```
### Python Artifacts
Generated Python package naming format:
`flink_agents-{version}-py3-none-any.whl`
Example:
```plaintext
python/dist/
├── flink_agents-0.2.dev0-py3-none-any.whl # Wheel package
└── flink_agents-0.2.dev0.tar.gz # Source package
```
**Package Contents:**
* Python API code (`flink_agents/` directory)
* Embedded Java jar packages (stored in `flink_agents/lib/` directory,
categorized by version)
```plaintext
flink_agents/lib/
├── flink-1.20/
│ └── flink-agents-dist-flink-1.20-0.2-SNAPSHOT.jar
├── flink-2.0/
│ └── flink-agents-dist-flink-2.0-0.2-SNAPSHOT.jar
├── flink-2.1/
│ └── flink-agents-dist-flink-2.1-0.2-SNAPSHOT.jar
└── flink-2.2/
└── flink-agents-dist-flink-2.2-0.2-SNAPSHOT.jar
```
* Python dependencies (declared via `pyproject.toml`)
* ~`apache-flink>=1.20.3,<=2.2.0`~ (removed to prevent automatic PyFlink
installation, requires manual installation by users)
* `pydantic`, `ollama`, `openai`, `anthropic`, `chromadb`, etc. (retained)
### Build Commands
```bash
# Build all versions of Java and Python packages
./tools/build.sh
# Build Java packages only
./tools/build.sh -j
# Build Python packages only
./tools/build.sh -p
```
**Build Process:**
1. **Java Build**: Execute `mvn clean install -DskipTests -B`
* Build all modules (api, plan, runtime, dist, integrations)
* Generate dist jar packages for each version
2. **Python Build**:
* Automatically detect and copy all version jar packages from
`dist/flink-*/target/` to `python/flink_agents/lib/flink-*/`
* Use `uv` to manage Python dependencies and build
* Generate Python wheel package containing all version jars
## Usage Guide
### Choosing the Right Version
Users should choose the corresponding Flink Agents version based on their Flink
cluster version.
#### Java Users
**Development Phase**: Use unified API module for development
```xml
<!-- Development dependency: Use Flink version-independent API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-api</artifactId>
<version>0.2-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
```
#### Python Users
Python packages automatically include jar packages for all supported versions
and select the appropriate implementation at runtime based on the Flink version
in the environment.
**Installation:**
```bash
# Install Flink Agents only, without PyFlink by default
pip install flink_agents
# Install Flink Agents along with PyFlink
pip install flink_agents apache-flink==0.2.0
```
GitHub link: https://github.com/apache/flink-agents/discussions/391
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]