liugddx opened a new issue, #9861: URL: https://github.com/apache/seatunnel/issues/9861
### Search before asking - [x] I had searched in the [feature](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement. ### Description ## 1. Overview ### 1.1 Function Description TikaDocument Transform is a document parsing transformer based on Apache Tika, used to extract structured data from documents of various formats (PDF, Word, Excel, PPT, plain text, etc.). This transformer parses document content into configurable structured fields, facilitating subsequent data processing and analysis. ### 1.2 Key Features - Multiple format support: Supports common document formats such as PDF, DOC/DOCX, XLS/XLSX, PPT/PPTX, TXT, HTML, RTF - Flexible output structure: Configurable output fields, including text content, metadata, document properties, etc. - Error handling: Supports multiple error handling strategies (skip, fail, record error) - Performance optimization: Built-in document type detection and caching mechanisms - Content filtering: Supports basic cleanup and formatting of text content ## 2. Use Cases - Document data mining: Extract key information from large volumes of documents for analysis - Content management systems: Build document indexing and search capabilities - Compliance checks: Extract document metadata for compliance validation - Data migration: Migrate document contents into structured data stores ## 3. Configuration Design ### 3.1 Basic Configuration Options ``` transform { TikaDocument { # Source field configuration source_field = "document_content" # Required: the field name containing document data (byte[] or base64 string) # Output field configuration output_fields = { content = "extracted_text" # Document text content title = "document_title" # Document title author = "document_author" # Document author creation_date = "created_at" # Creation time content_type = "mime_type" # MIME type language = "detected_language" # Detected language page_count = "total_pages" # Page count (applicable to PDF, etc.) file_size = "document_size" # Document size keywords = "document_keywords" # Keywords subject = "document_subject" # Document subject } # Parsing options parse_options = { extract_text = true # Whether to extract text content extract_metadata = true # Whether to extract metadata max_string_length = 100000 # Max string length limit ocr_enabled = false # Whether to enable OCR (for text in images) preserve_formatting = false # Whether to preserve formatting information } # Content processing options content_processing = { remove_empty_lines = true # Remove empty lines trim_whitespace = true # Trim leading/trailing whitespace normalize_whitespace = true # Normalize whitespace characters min_content_length = 10 # Minimum content length encoding = "UTF-8" # Output encoding } # Error handling configuration error_handling = { on_parse_error = "skip" # skip/fail/null on_unsupported_format = "skip" # skip/fail/null log_errors = true # Whether to record error logs } # Advanced options advanced = { timeout_ms = 30000 # Parsing timeout (ms) enable_tika_config = false # Whether to use a custom Tika config tika_config_path = "/path/to/tika-config.xml" # Tika config file path } } } ``` ### 3.2 Simplified Configuration Example ``` transform { TikaDocument { source_field = "file_data" output_fields = { content = "text_content" content_type = "file_type" title = "title" } } } ``` ## 4. Input and Output Structures ### 4.1 Input Data Structure ```markdown Example input rows: | id | file_name | file_data | other_field | |-----|-----------------|--------------------|-------------| | 1 | "document.pdf" | [binary_data...] | "value1" | | 2 | "report.docx" | [binary_data...] | "value2" | ``` ### 4.2 Output Data Structure ``` Example output rows: | id | file_name | file_data | other_field | text_content | file_type | title | created_at | |-----|-----------------|--------------------|-------------|----------------|------------------|-----------------|-------------------| | 1 | "document.pdf" | [binary_data...] | "value1" | "This is a..." | "application/pdf"| "Annual Report" | "2024-01-15" | | 2 | "report.docx" | [binary_data...] | "value2" | "Executive..." | "application/..."| "Q4 Summary" | "2024-02-20" | ``` ## 5. Core Architecture Design ### 5.1 Class Structure Design ``` org.apache.seatunnel.transform.tikadocument/ ├── TikaDocumentTransform.java # Core Transform implementation ├── TikaDocumentTransformFactory.java # Factory implementation ├── TikaDocumentTransformConfig.java # Configuration class ├── TikaDocumentTransformErrorCode.java # Error code definitions ├── TikaDocumentMultiCatalogTransform.java # Multi-table support ├── extractor/ │ ├── DocumentExtractor.java # Document extractor interface │ ├── TikaDocumentExtractor.java # Tika implementation │ └── DocumentMetadata.java # Metadata wrapper ├── processor/ │ ├── ContentProcessor.java # Content processor interface │ └── DefaultContentProcessor.java # Default implementation └── exception/ └── TikaDocumentException.java # Exception class ``` ### 5.2 Core Implementation Ideas 1. Inherit MultipleFieldOutputTransform: Supports multi-field output suitable for extracting multiple document properties 2. Document type detection: Use Tika’s auto-detection to identify document formats 3. Streaming processing: Support streaming parsing for large files to avoid OOM 4. Caching mechanism: Cache documents with identical content to improve performance 5. Error recovery: Provide multiple error handling strategies to ensure continuity of data processing ## 6. Detailed Implementation Plan ### 6.1 Key Points of the Transform Main Class ``` public class TikaDocumentTransform extends MultipleFieldOutputTransform { private final TikaDocumentTransformConfig config; private final DocumentExtractor extractor; private final ContentProcessor processor; @Override protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) { // 1. Get source field data // 2. Parse the document using Tika // 3. Extract configured field information // 4. Process and format the content // 5. Return the result array } @Override protected Column[] getOutputColumns() { // Generate output column definitions based on configuration } } ``` ### 6.2 Document Extractor Interface Design ``` public interface DocumentExtractor { DocumentMetadata extract(byte[] documentData, ParseOptions options); boolean isSupported(String mimeType); void setTimeout(long timeoutMs); } public class DocumentMetadata { private String content; private String title; private String author; private Date creationDate; private String contentType; private Map<String, Object> customMetadata; // getters and setters... } ``` ## 7. Dependency Management ### 7.1 Maven Dependencies Add the following dependencies to pom.xml: ``` <!-- Apache Tika core library --> <dependency> <groupId>org.apache.tika</groupId> <artifactId>tika-core</artifactId> <version>2.9.1</version> </dependency> <!-- Tika parser bundle --> <dependency> <groupId>org.apache.tika</groupId> <artifactId>tika-parsers-standard-package</artifactId> <version>2.9.1</version> </dependency> <!-- OCR support (optional) --> <dependency> <groupId>org.apache.tika</groupId> <artifactId>tika-parser-ocr-module</artifactId> <version>2.9.1</version> <optional>true</optional> </dependency> ``` ### 7.2 Plugin Registration Add to plugin-mapping.properties: ``` seatunnel.transform.TikaDocument = seatunnel-transforms-v2 ``` ## 8. Usage Examples ### 8.1 Basic Document Parsing ``` env { parallelism = 1 } source { LocalFile { path = "/data/documents" file_format = "binary" result_table_name = "document_table" } } transform { TikaDocument { source_table = "document_table" result_table_name = "parsed_documents" source_field = "content" output_fields = { content = "text_content" title = "doc_title" author = "doc_author" content_type = "mime_type" creation_date = "created_date" } parse_options = { extract_text = true extract_metadata = true max_string_length = 50000 } content_processing = { remove_empty_lines = true trim_whitespace = true min_content_length = 100 } } } sink { Console { source_table = "parsed_documents" } } ``` ### 8.2 Batch Document Processing ``` source { Jdbc { url = "jdbc:mysql://localhost:3306/docs" driver = "com.mysql.cj.jdbc.Driver" query = "SELECT id, filename, file_data FROM documents WHERE processed = false" result_table_name = "raw_documents" } } transform { TikaDocument { source_table = "raw_documents" result_table_name = "processed_documents" source_field = "file_data" output_fields = { content = "extracted_content" title = "document_title" page_count = "pages" file_size = "size_bytes" language = "content_language" keywords = "extracted_keywords" } error_handling = { on_parse_error = "null" log_errors = true } } } sink { Jdbc { source_table = "processed_documents" url = "jdbc:mysql://localhost:3306/docs" driver = "com.mysql.cj.jdbc.Driver" query = """ INSERT INTO processed_documents (id, filename, extracted_content, document_title, pages, size_bytes, content_language, extracted_keywords) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """ } } ``` ## 9. Error Handling and Monitoring ### 9.1 Error Types - Parsing errors: Damaged or unsupported document formats - Timeout errors: Parsing time exceeds configured limit - Memory errors: Documents too large causing insufficient memory - Encoding errors: Character encoding issues ### 9.2 Monitoring Metrics - Count of successful/failed parses - Average parsing time - Processing statistics by document type - Distribution of error types ## 10. Performance Optimization ### 10.1 Optimization Strategies 1. Document type cache: Cache document type detection results 2. Parser reuse: Reuse Tika parser instances 3. Memory management: Release memory for large documents in a timely manner 4. Concurrent processing: Support multi-threaded concurrent parsing 5. Streaming processing: Use streaming parsing for large documents ### 10.2 Configuration Tuning ``` advanced = { timeout_ms = 60000 # Longer time for large documents max_concurrent_parsers = 4 # Number of concurrent parsers memory_limit_mb = 512 # Memory limit cache_size = 1000 # Cache size for parsing results } ``` ## 11. Test Plan ### 11.1 Unit Tests - Configuration parsing tests - Document extraction functionality tests - Error handling tests - Content processing tests ### 11.2 Integration Tests - End-to-end tests for different document formats - Performance stress tests - Error recovery tests - Large file handling tests ### 11.3 Test Cases ``` @Test public void testPdfDocumentExtraction() { // Test PDF document parsing } @Test public void testWordDocumentExtraction() { // Test Word document parsing } @Test public void testErrorHandling() { // Test error handling mechanism } ``` ## 12. Documentation and Examples ### 12.1 User Documentation - Detailed explanation of configuration parameters - List of supported document formats - Performance tuning guide - Troubleshooting guide ### 12.2 Developer Documentation - API documentation - Extension development guide - Contribution guide ## 13. Release Plan ### 13.1 Phase One (MVP) - Basic document parsing functionality - Support for common formats (PDF, DOC, XLS, TXT) - Basic error handling ### 13.2 Phase Two - OCR support - Advanced content processing - Performance optimization ### 13.3 Phase Three - Custom parser support - Enhanced metadata extraction - Monitoring and metrics ## 14. Summary TikaDocument Transform provides powerful document processing capabilities for SeaTunnel. Leveraging Apache Tika’s mature technology stack, it can reliably parse multiple document formats and extract structured data. This design fully considers performance, error handling, and extensibility to meet enterprise-grade data processing needs. With flexible configuration options and multiple error handling strategies, users can tailor behavior to specific scenarios to ensure the stability and reliability of data processing pipelines. ### Usage Scenario _No response_ ### Related issues _No response_ ### Are you willing to submit a PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
