YuriyGavrilov commented on issue #8604:
URL: https://github.com/apache/seatunnel/issues/8604#issuecomment-2638901983
make some config update to filter rows like this
```
env {
parallelism = 1
job.mode = "BATCH"
read_limit.rows_per_second = 120
}
source {
LocalFile {
schema {
fields {
no = string
answer = string
}
}
path = "/config/2data44.csv"
file_format_type = "text"
field_delimiter = "\t"
csv.quote_char = "\""
skip_header_row_number = 1
read_columns = ["no", "answer"]
plugin_output = "source_table"
parse_options {
skip_empty_rows = true
allow_missing_values = true
error_handling = "skip_row"
}
}
}
transform {
Sql {
plugin_input = "source_table"
source_table_name = "source_table"
result_table_name = "cleaned_table"
plugin_output = "cleaned_output"
query = """
SELECT
REGEXP_REPLACE(
no,
'[^\\sа-яА-ЯёЁ0-9\\.,!\\?;:\\\"«»\\(\\)\\-]',
' '
) AS cleaned_no,
REGEXP_REPLACE(
answer,
'[^\\sа-яА-ЯёЁ0-9\\.,!\\?;:\\\"«»\\(\\)\\-]',
' '
) AS cleaned_answer
FROM source_table
"""
}
LLM {
plugin_input = "cleaned_output"
source_table_name = "cleaned_table"
result_table_name = "llm_processed_table"
plugin_output = "llm_output"
model_provider = OPENAI
inference_columns = ["cleaned_answer"]
model = gpt-4o-mini
# error_handling = "SKIP"
timeout = 950000
retry_times = 3
batch_size = 1
delay_between_requests = 2000
normalize_input = true
min_message_length = 2
api_key = "XXXXX" #
# output_column = "llm_result" # Добавьте output_column
api_path = "https://api.XXXX/openai/v1/chat/completions"
error_handling {
mode = "SKIP"
default_value = "999"
}
request_limits {
max_requests_per_minute = 100
max_tokens_per_minute = 200000
}
request_config {
temperature = 0.0
max_tokens = 30000
top_p = 1.0
}
output_data_type = "STRING"
input_fields = ["cleaned_answer"]
result_field = "llm_summary"
append_fields = ["cleaned_no", "cleaned_answer"]
prompt = "XXXX ${cleaned_answer}. XXXXX"
}
}
sink {
LocalFile {
plugin_input = "llm_output"
columns = ["cleaned_no", "cleaned_answer", "llm_summary"]
path = "/config/3data_out.csv"
file_format_type = "text"
field_delimiter = "\t"
encoding = "UTF-8"
rolling_policy.size = "128MB"
data_save_mode = "APPEND_DATA"
enable_checkpoint = true
tmp_path = "/config/tmp/sea"
write_options {
max_rows_in_memory = 100000
batch_size = 500
retry_times = 3
retry_interval = 1000
}
}
}
```
received error close to the end of main file
```
2025-02-05 22:20:55,446 ERROR [o.a.s.c.s.SeaTunnel ] [main] -
===============================================================================
2025-02-05 22:20:55,446 ERROR [o.a.s.c.s.SeaTunnel ] [main] -
Fatal Error,
�2025-02-05 22:20:55,446 ERROR [o.a.s.c.s.SeaTunnel ] [main] -
Please submit bug report in https://github.com/apache/seatunnel/issues
2025-02-05 22:20:55,446 ERROR [o.a.s.c.s.SeaTunnel ] [main] -
Reason:SeaTunnel job executed failed
�2025-02-05 22:20:55,447 ERROR [o.a.s.c.s.SeaTunnel ] [main] -
Exception
StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException:
SeaTunnel job executed failed
at
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by:
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException:
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException:
ErrorCode:[FILE-08], ErrorDescription:[File read failed] - Read data from this
file [source_table_file:/config/2data44.csv] failed
� at
org.apache.seatunnel.connectors.seatunnel.file.source.reader.MultipleTableFileSourceReader.pollNext(MultipleTableFileSourceReader.java:85)
at
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
at
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:169)
at
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
at
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
at
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1019)
at
org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: Failed to inference model with row
SeaTunnelRow{tableId=, kind=+I, fields=[2094884, "REMOVED THIS TEXT but it is
looks like good visually"]}
at
org.apache.seatunnel.transform.nlpmodel.llm.LLMTransform.getOutputFieldValue(LLMTransform.java:160)
at
org.apache.seatunnel.transform.common.SingleFieldOutputTransform.transformRow(SingleFieldOutputTransform.java:47)
at
org.apache.seatunnel.transform.common.SingleFieldOutputTransform.transformRow(SingleFieldOutputTransform.java:35)
at
org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform.transform(AbstractSeaTunnelTransform.java:80)
at
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform.map(AbstractCatalogSupportMapTransform.java:42)
at
org.apache.seatunnel.transform.common.AbstractCatalogSupportMapTransform.map(AbstractCatalogSupportMapTransform.java:27)
at
org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform.map(AbstractMultiCatalogMapTransform.java:40)
at
org.apache.seatunnel.transform.common.AbstractMultiCatalogMapTransform.map(AbstractMultiCatalogMapTransform.java:28)
at
org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.transform(TransformFlowLifeCycle.java:160)
at
org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:122)
at
org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:43)
at
org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:195)
at
org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:112)
� at
org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy.lambda$readProcess$0(TextReadStrategy.java:133)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.util.stream.SliceOps$1$1.accept(SliceOps.java:204)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at
org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy.readProcess(TextReadStrategy.java:104)
� at
org.apache.seatunnel.connectors.seatunnel.file.source.reader.AbstractReadStrategy.resolveArchiveCompressedInputStream(AbstractReadStrategy.java:268)
at
org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy.read(TextReadStrategy.java:71)
� at
org.apache.seatunnel.connectors.seatunnel.file.source.reader.MultipleTableFileSourceReader.pollNext(MultipleTableFileSourceReader.java:81)
... 12 more
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at
sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:464)
at
sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:68)
at
sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1346)
at sun.security.ssl.SSLSocketImpl.access$300(SSLSocketImpl.java:73)
at
sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:962)
at
org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
at
org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:155)
at
org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:284)
at
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
at
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
at
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
at
org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
at
org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157)
at
org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
at
org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
at
org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
at
org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
at
org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
at
org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at
org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
at
org.apache.seatunnel.transform.nlpmodel.llm.remote.kimiai.KimiAIModel.chatWithModel(KimiAIModel.java:75)
at
org.apache.seatunnel.transform.nlpmodel.llm.remote.AbstractModel.inference(AbstractModel.java:97)
at
org.apache.seatunnel.transform.nlpmodel.llm.LLMTransform.getOutputFieldValue(LLMTransform.java:142)
... 39 more
at
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
... 2 more
```
this time it processed 7300 lines, so i will dig into filtering more. but
interesting that LLM did'n show anything and can't skip errors.
```
2025-02-05 22:20:46,888 INFO [o.a.s.e.c.j.JobMetricsRunner ]
[job-metrics-runner-939621343589040129] -
***********************************************
Job Progress Information
***********************************************
Job Id : 939621343589040129
Read Count So Far : 7364
Write Count So Far : 7360
Average Read Count : 1/s
Average Write Count : 1/s
Last Statistic Time : 2025-02-05 22:19:46
Current Statistic Time : 2025-02-05 22:20:46
***********************************************
2025-02-05 22:20:52,306 WARN [o.a.s.e.s.TaskExecutionService]
[BlockingWorker-TaskGroupLocation{jobId=939621343589040129, pipelineId=1,
taskGroupId=50000}] - [localhost]:5801 [seatunnel-1050] [5.1] Exception in
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@60c5a77f
�org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException:
ErrorCode:[FILE-08], ErrorDescription:[File read failed] - Read data from this
file [source_table_file:/config/2data44.csv] failed
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]