cmx-ops opened a new issue, #7740: URL: https://github.com/apache/seatunnel/issues/7740
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened I initially planned to use SQL to deduplicate data, but it currently does not support group by aggregation, and using distinct has no effect. Then I found out that I could use DynamicCompile to code and process the data, but the Java code I wrote for data deduplication does not compile successfully, and I don't know where the problem lies. my code: ```java import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.type.BasicType; import java.util.ArrayList; import java.util.HashSet; import java.util.Set; public class DeduplicationTransform { // 使用一个集合来存储已经见过的组合键 private Set<String> uniqueRows = new HashSet<String>(); public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { ArrayList<Column> columns = new ArrayList<Column>(); // 复制原始列 columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, 255, true, "", "")); columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, 10, true, "", "")); columns.add(PhysicalColumn.of("gender", BasicType.STRING_TYPE, 50, true, "", "")); return columns.toArray(new Column[0]); } public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { // 创建一个组合键,包含 name, age, gender String compositeKey = inputRow.getString("name") + "#" + inputRow.getInt("age") + "#" + inputRow.getString("gender"); // 检查这个组合键是否已经出现过 if (uniqueRows.add(compositeKey)) { Object[] fieldValues = new Object[3]; fieldValues[0] = inputRow.getString("name"); fieldValues[1] = inputRow.getInt("age"); fieldValues[2] = inputRow.getString("gender"); return fieldValues; } else { // 如果这行数据已经出现过,返回 null 或者空数组 return null; } } } ``` ### SeaTunnel Version 2.3.7 ### SeaTunnel Config ```conf # 数据转换:使用 SQL 进行去重 transform { DynamicCompile { source_table_name = "csv_data" result_table_name = "csv_data_output" compile_language = "JAVA" compile_pattern = "ABSOLUTE_PATH" absolute_path="""/home/reformer/apache_seaTunnel/dynamic_compile_code/DeduplicationTransform.java""" } } # 数据输出:输出到控制台 #sink { # Console { # source_table_name = "csv_data_output" # } #} sink { LocalFile { path = "/home/reformer/apache_seaTunnel/localfile/test_local_file_output" file_format_type = "csv" field_delimiter = "," schema { fields { name = string age = int gender = string } } compress_codec = "none" encoding = "UTF-8" } } ``` ### Running Command ```shell ./start-seatunnel-spark-2-connector-v2.sh --master local[4] --deploy-mode client --config ../config/local_file_source_distinct_by_code.conf ``` ### Error Exception ```log Caused by: org.apache.seatunnel.transform.exception.TransformException: ErrorCode:[COMPILE_TRANSFORM_ERROR_CODE-01], ErrorDescription:[CompileTransform error please check code] - org.apache.seatunnel.shade.org.codehaus.commons.compiler.CompileException: Line 21, Column 15: Assignment conversion not possible from type "java.lang.Object[]" to type "org.apache.seatunnel.api.table.catalog.Column[]" at org.apache.seatunnel.transform.dynamiccompile.DynamicCompileTransform.getOutputColumns(DynamicCompileTransform.java:89) at org.apache.seatunnel.transform.common.MultipleFieldOutputTransform.transformTableSchema(MultipleFieldOutputTransform.java:67) at org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform.transformCatalogTable(AbstractCatalogSupportTransform.java:64) at org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform.getProducedCatalogTable(AbstractCatalogSupportTransform.java:54) at org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor.sparkTransform(TransformExecuteProcessor.java:143) at org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor.execute(TransformExecuteProcessor.java:119) ... 16 more ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
