davidzollo opened a new issue, #10182:
URL: https://github.com/apache/seatunnel/issues/10182
## Background
Currently, SeaTunnel supports 200+ connectors written in Java, but lacks
native support for Python-based data sources. Many data engineers and data
scientists prefer Python for its rich ecosystem of data processing libraries
(pandas, requests, boto3, etc.) and ease of use.
This feature request proposes to enable users to write custom data sources
in Python while leveraging SeaTunnel's powerful distributed execution engine.
## Motivation
### Why Python Support?
1. **Rich Ecosystem**: Python has extensive libraries for data processing,
APIs, databases, and cloud services
2. **Lower Barrier**: Many data engineers are more comfortable with Python
than Java
3. **Rapid Prototyping**: Quickly test new data sources without writing Java
code
4. **Data Science Integration**: Seamlessly integrate ML model outputs or
preprocessing scripts
5. **Community Demand**: Aligns with the existing Python SDK proposal in
`me/pythonsdk.md`
### Use Cases
- **Custom API Integration**: Fetch data from APIs without existing
connectors
- **Python Libraries**: Leverage boto3, pymongo, elasticsearch-py, etc.
- **Data Science Workflows**: Integrate ML predictions, feature engineering
- **Legacy Systems**: Wrap Python scripts that already fetch data
- **Rapid Development**: Prototype connectors before writing production Java
code
## Proposed Solution
### Phase 1: IPC-based Python Source (MVP)
Enable Python scripts to act as SeaTunnel sources through Inter-Process
Communication (IPC).
#### Architecture
\`\`\`
┌─────────────────────────────────────┐
│ SeaTunnel Engine (Java) │
│ ┌──────────────────────────────┐ │
│ │ PythonSource │ │
│ │ - createReader() │ │
│ │ - createEnumerator() │ │
│ └──────────┬───────────────────┘ │
│ │ │
│ ┌──────────▼───────────────────┐ │
│ │ PythonSourceReader │ │
│ │ - pollNext() │ │
│ │ - snapshotState() │ │
│ └──────────┬───────────────────┘ │
│ │ IPC │
│ │ (ProcessBuilder) │
└─────────────┼────────────────────────┘
│
│ stdout/stdin
│ (JSON Lines)
│
┌─────────────▼────────────────────────┐
│ Python Process │
│ ┌──────────────────────────────┐ │
│ │ User Python Script │ │
│ │ - read_data() │ │
│ │ - get_schema() │ │
│ └──────────────────────────────┘ │
└──────────────────────────────────────┘
\`\`\`
#### Communication Protocol
**Data Exchange Format**: JSON Lines (one JSON object per line)
**Schema Discovery**:
\`\`\`json
// Request: {"type": "GET_SCHEMA"}
// Response:
{
"type": "SCHEMA",
"fields": [
{"name": "id", "type": "BIGINT"},
{"name": "name", "type": "STRING"},
{"name": "timestamp", "type": "TIMESTAMP"}
]
}
\`\`\`
**Data Reading**:
\`\`\`json
// Request: {"type": "READ_NEXT"}
// Response:
{
"type": "DATA",
"rows": [
{"id": 1, "name": "Alice", "timestamp": "2025-01-01T00:00:00Z"},
{"id": 2, "name": "Bob", "timestamp": "2025-01-02T00:00:00Z"}
]
}
\`\`\`
**End of Stream**:
\`\`\`json
{"type": "END_OF_STREAM"}
\`\`\`
### Example Configuration
\`\`\`hocon
source {
PythonSource {
# Python executable path (default: python3)
python.executable = "python3"
# Path to user's Python script
python.script.path = "/path/to/user_script.py"
# Configuration passed to Python script
python.script.config = {
api_url = "https://api.example.com"
api_key = "\${API_KEY}"
batch_size = 100
}
# Output table name
result_table_name = "python_table"
}
}
\`\`\`
### Example Python Script
\`\`\`python
#!/usr/bin/env python3
import sys
import json
import requests
def get_schema():
"""Return the schema of the data source."""
return {
"type": "SCHEMA",
"fields": [
{"name": "id", "type": "BIGINT"},
{"name": "name", "type": "STRING"},
{"name": "email", "type": "STRING"},
{"name": "created_at", "type": "TIMESTAMP"}
]
}
def read_data(config):
"""
Read data from the source and yield rows.
Args:
config (dict): Configuration passed from SeaTunnel
Yields:
dict: Data rows in batch
"""
api_url = config.get("api_url")
api_key = config.get("api_key")
batch_size = config.get("batch_size", 100)
# Example: Fetch data from API
response = requests.get(
api_url,
headers={"Authorization": f"Bearer {api_key}"}
)
data = response.json()
# Yield data in batches
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
yield {
"type": "DATA",
"rows": batch
}
# Signal end of stream
yield {"type": "END_OF_STREAM"}
def main():
"""Main entry point for SeaTunnel communication."""
for line in sys.stdin:
request = json.loads(line)
if request["type"] == "GET_SCHEMA":
schema = get_schema()
print(json.dumps(schema), flush=True)
elif request["type"] == "READ_NEXT":
config = request.get("config", {})
for row in read_data(config):
print(json.dumps(row), flush=True)
if row["type"] == "END_OF_STREAM":
break
if __name__ == "__main__":
main()
\`\`\`
## Implementation Plan
### Phase 1: Core Implementation (4-6 weeks)
#### Week 1-2: Foundation
- Create \`seatunnel-connectors-v2/connector-python\` module
- Implement \`PythonSource\` class (extends \`SeaTunnelSource\`)
- Implement configuration parsing and validation
- Implement process management with \`ProcessBuilder\`
#### Week 3-4: Data Exchange
- Implement \`PythonSourceReader\` class (extends \`SourceReader\`)
- Implement JSON serialization/deserialization
- Implement buffering with \`LinkedBlockingQueue\`
- Implement error handling and retry logic
#### Week 5-6: Testing and Documentation
- Write unit tests (\`PythonSourceTest\`, \`PythonSourceReaderTest\`)
- Write integration tests (\`PythonSourceIT\`)
- Write user documentation with examples
- Write developer documentation
### Module Structure
\`\`\`
seatunnel-connectors-v2/
└── connector-python/
├── pom.xml
└── src/
├── main/
│ ├── java/org/apache/seatunnel/connectors/seatunnel/python/
│ │ ├── source/
│ │ │ ├── PythonSource.java
│ │ │ ├── PythonSourceReader.java
│ │ │ ├── PythonSourceSplit.java
│ │ │ └── PythonSourceSplitEnumerator.java
│ │ ├── config/
│ │ │ └── PythonSourceConfig.java
│ │ ├── serialize/
│ │ │ └── JsonToRowDeserializer.java
│ │ └── exception/
│ │ └── PythonExecutionException.java
│ └── resources/
│ └── META-INF/services/
│ └── org.apache.seatunnel.api.table.factory.Factory
└── test/
├── java/org/apache/seatunnel/connectors/seatunnel/python/
│ ├── PythonSourceTest.java
│ ├── PythonSourceReaderTest.java
│ └── PythonSourceIT.java
└── resources/python/
├── test_source.py
└── error_source.py
\`\`\`
## Type Mapping
| Python Type | SeaTunnel Type | Notes |
|-------------|----------------|-------|
| int | BIGINT | 64-bit integer |
| float | DOUBLE | Double precision |
| str | STRING | UTF-8 string |
| bool | BOOLEAN | True/False |
| bytes | BYTES | Binary data |
| datetime | TIMESTAMP | ISO 8601 format |
| date | DATE | YYYY-MM-DD |
| list | ARRAY | Homogeneous array |
| dict | MAP | Key-value pairs |
| None | NULL | Null value |
## Performance Considerations
### MVP Performance Targets
- **Throughput**: 1K-10K rows/second (acceptable for most use cases)
- **Latency**: <100ms per batch read
- **Memory**: Buffer up to 10K rows in queue
### Future Optimization Opportunities
- Use Protocol Buffers instead of JSON (5-10x faster serialization)
- Shared memory (e.g., Apache Arrow) for zero-copy data transfer
- Multi-process parallelism with split assignment
- Batch size tuning and adaptive batching
## Implementation References
- **Process management**: \`RsyncFileTransfer.java:58-96\` in
\`connector-clickhouse\`
- **Buffered reading**: \`GraphQLSourceSocketReader.java:109-117\` in
\`connector-graphql\`
- **Source API**:
\`seatunnel-api/src/main/java/org/apache/seatunnel/api/source/\`
- **Existing Python SDK proposal**: \`me/pythonsdk.md\`
## Success Criteria
- ✅ Users can run Python scripts as SeaTunnel sources
- ✅ Support basic data types (string, int, float, boolean, timestamp, array,
map)
- ✅ Handle at least 1K rows/second throughput
- ✅ Provide clear error messages for common failures
- ✅ Pass all unit and integration tests (coverage > 80%)
- ✅ Complete user documentation with 3+ example scripts
- ✅ No breaking changes to existing connectors
## Security Considerations
### MVP Security Measures
- Validate Python script path (prevent directory traversal)
- Limit process resources (memory, CPU via OS mechanisms)
- Sanitize configuration passed to Python
- Capture and log stderr for debugging
### Future Enhancements
- Sandboxed execution (Docker, containers)
- Script signature verification
- Resource quotas and rate limiting
## Future Phases
### Phase 2: Performance Enhancement
- Protocol Buffers support for faster serialization
- Multi-process parallelism
- Checkpoint and fault tolerance
- Performance benchmarking
### Phase 3: Python SDK
- Complete Python SDK (as proposed in \`me/pythonsdk.md\`)
- PyPI package: \`seatunnel-python-sdk\`
- High-level API for defining sources, sinks, transforms
- Integration with Apache Arrow for high-performance data exchange
## Open Questions
1. **Virtual Environments**: Should we support Python virtual environments
(venv/conda)?
2. **Python Distribution**: Should we bundle a Python interpreter or require
system Python?
3. **Batch Size**: What should be the default batch size for data reading?
4. **Async Support**: Should we support async Python (asyncio) in the future?
5. **Python Version**: What minimum Python version should we support?
(Recommend Python 3.8+)
## Alternatives Considered
### Alternative 1: HTTP/REST API
- **Pros**: Language agnostic, network-based
- **Cons**: Higher latency, requires running a separate service
### Alternative 2: Py4J/JPype
- **Pros**: Direct Java-Python bridge
- **Cons**: Complex dependency management, version compatibility issues
### Alternative 3: Apache Arrow Flight
- **Pros**: High performance, designed for data exchange
- **Cons**: More complex implementation, overkill for MVP
**Decision**: IPC via ProcessBuilder is the simplest and most maintainable
approach for MVP.
## Community Feedback Welcome
We welcome feedback from the community on:
- 📊 **Use cases and requirements**: What data sources would you connect with
Python?
- ⚡ **Performance expectations**: What throughput do you need?
- 🔒 **Security concerns**: What security features are important?
- 🎨 **API design**: Any suggestions for the Python script interface?
- 🛠️ **Implementation**: Willing to contribute? Let's collaborate!
## Related Issues
- Python SDK proposal: \`me/pythonsdk.md\` in the codebase
- Related to expanding SeaTunnel's connector ecosystem
---
**Labels**: feature, connector
**Component**: connector-python
**Priority**: Medium
**Estimated Effort**: 4-6 weeks for MVP
**Breaking Changes**: None
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]