cmx-ops opened a new issue, #7740:
URL: https://github.com/apache/seatunnel/issues/7740

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   I initially planned to use SQL to deduplicate data, but it currently does 
not support group by aggregation, and using distinct has no effect. Then I 
found out that I could use DynamicCompile to code and process the data, but the 
Java code I wrote for data deduplication does not compile successfully, and I 
don't know where the problem lies.
   
   my code:
   ```java
   import org.apache.seatunnel.api.table.catalog.Column;
   import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
   import org.apache.seatunnel.api.table.catalog.CatalogTable;
   import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
   import org.apache.seatunnel.api.table.type.BasicType;
   
   import java.util.ArrayList;
   import java.util.HashSet;
   import java.util.Set;
   
   public class DeduplicationTransform {
       // 使用一个集合来存储已经见过的组合键
       private Set<String> uniqueRows = new HashSet<String>();
   
       public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) {
           ArrayList<Column> columns = new ArrayList<Column>();
           // 复制原始列
           columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, 255, 
true, "", ""));
           columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, 10, true, 
"", ""));
           columns.add(PhysicalColumn.of("gender", BasicType.STRING_TYPE, 50, 
true, "", ""));
           return columns.toArray(new Column[0]);
       }
   
       public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor 
inputRow) {
           // 创建一个组合键,包含 name, age, gender
           String compositeKey = inputRow.getString("name") + "#" + 
inputRow.getInt("age") + "#" + inputRow.getString("gender");
           
           // 检查这个组合键是否已经出现过
           if (uniqueRows.add(compositeKey)) {
               Object[] fieldValues = new Object[3];
               fieldValues[0] = inputRow.getString("name");
               fieldValues[1] = inputRow.getInt("age");
               fieldValues[2] = inputRow.getString("gender");
               return fieldValues;
           } else {
               // 如果这行数据已经出现过,返回 null 或者空数组
               return null;
           }
       }
   }
   ```
   
   
   
   
   
   
   ### SeaTunnel Version
   
   2.3.7
   
   ### SeaTunnel Config
   
   ```conf
   # 数据转换:使用 SQL 进行去重
   transform {
    DynamicCompile {
     source_table_name = "csv_data"
     result_table_name = "csv_data_output"
     compile_language = "JAVA"
     compile_pattern = "ABSOLUTE_PATH"
     
absolute_path="""/home/reformer/apache_seaTunnel/dynamic_compile_code/DeduplicationTransform.java"""
    }
   }
   
   # 数据输出:输出到控制台
   #sink {
   #  Console {
   #    source_table_name = "csv_data_output"
   #  }
   #}
   
   sink {
     LocalFile {
       path = "/home/reformer/apache_seaTunnel/localfile/test_local_file_output"
       file_format_type = "csv"
       field_delimiter = ","
       schema {
         fields {
           name = string
           age = int
           gender = string
         }
       }
       compress_codec = "none"
       encoding = "UTF-8"
     }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   ./start-seatunnel-spark-2-connector-v2.sh --master local[4] --deploy-mode 
client --config ../config/local_file_source_distinct_by_code.conf
   ```
   
   
   ### Error Exception
   
   ```log
   Caused by: org.apache.seatunnel.transform.exception.TransformException: 
ErrorCode:[COMPILE_TRANSFORM_ERROR_CODE-01], ErrorDescription:[CompileTransform 
error please check code] - 
org.apache.seatunnel.shade.org.codehaus.commons.compiler.CompileException: Line 
21, Column 15: Assignment conversion not possible from type 
"java.lang.Object[]" to type "org.apache.seatunnel.api.table.catalog.Column[]"
           at 
org.apache.seatunnel.transform.dynamiccompile.DynamicCompileTransform.getOutputColumns(DynamicCompileTransform.java:89)
           at 
org.apache.seatunnel.transform.common.MultipleFieldOutputTransform.transformTableSchema(MultipleFieldOutputTransform.java:67)
           at 
org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform.transformCatalogTable(AbstractCatalogSupportTransform.java:64)
           at 
org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform.getProducedCatalogTable(AbstractCatalogSupportTransform.java:54)
           at 
org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor.sparkTransform(TransformExecuteProcessor.java:143)
           at 
org.apache.seatunnel.core.starter.spark.execution.TransformExecuteProcessor.execute(TransformExecuteProcessor.java:119)
           ... 16 more
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to