mmmxl commented on issue #6007: URL: https://github.com/apache/paimon/issues/6007#issuecomment-3174133377
I've encountered this issue as well. ## Identified Causes 1.The project code uses SinkFunction.Context. In Flink 1.x, this class resides in `org.apache.flink.streaming.api.functions.sink.SinkFunction`, but in 2.x it was moved to `org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction`. 2.The Maven build didn't select `flink2` and `java11` profiles during packaging, resulting in compilation against Flink 1.20. ## Fix Build with the correct profiles: ```bash mvn clean install -P flink2 -P java11 -am -DskipTests ``` ## Troubleshooting Process ### SQL Queries: ```sql CREATE TABLE IF NOT EXISTS paimon_catalog.tmp.test_paimon_log_fixed ( order_number BIGINT, price INT, buyer STRING, dt STRING ) PARTITIONED BY ( dt ) WITH ( -- bucket 'bucket' = '1', 'bucket-key' = 'order_number', 'write-only' = 'true' ); -- create a word data generator table CREATE TABLE tmp_order_data_gen ( order_number BIGINT, price INT, buyer STRING, `proc_time` AS PROCTIME () ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10000', 'fields.buyer.length' = '100' ); INSERT INTO paimon_catalog.tmp.test_paimon_log_fixed SELECT order_number, price, buyer, DATE_FORMAT(proc_time, 'yyyy-MM-dd') AS dt FROM tmp_order_data_gen; ``` ### Error consistency The error message is consistent with the previous case reported. ### Error location The error actually occurs in `org.apache.paimon.flink.sink.RowDataStoreWriteOperator#createStreamOperator` when instantiating the `RowDataStoreWriteOperatorclass`: ```java @Override @SuppressWarnings("unchecked") public <T extends StreamOperator<Committable>> T createStreamOperator( StreamOperatorParameters<Committable> parameters) { return (T) new RowDataStoreWriteOperator( parameters, table, logSinkFunction, storeSinkWriteProvider, initialCommitUser); } ``` ### Problematic context class `RowDataStoreWriteOperator$SimpleContext.class` ```java // Source code is decompiled from a .class file using FernFlower decompiler. package org.apache.paimon.flink.sink; import javax.annotation.Nullable; // Problematic import import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; class RowDataStoreWriteOperator$SimpleContext implements SinkFunction.Context { @Nullable private Long timestamp; private final ProcessingTimeService processingTimeService; public RowDataStoreWriteOperator$SimpleContext(RowDataStoreWriteOperator var1, ProcessingTimeService processingTimeService) { this.this$0 = var1; this.processingTimeService = processingTimeService; } public long currentProcessingTime() { return this.processingTimeService.getCurrentProcessingTime(); } public long currentWatermark() { return RowDataStoreWriteOperator.access$100(this.this$0); } public Long timestamp() { return this.timestamp; } } ``` ## Additional Context: - The official JAR from [Maven Central](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.0/1.2.0/paimon-flink-2.0-1.2.0.jar) also exhibits this issue and cannot be used directly. Local compilation with the above command is currently required. - We hope this information helps the community improve future versions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org