This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d80c1e435d [flink] Restart dedicated compact job if schema change is 
detected (#6124)
d80c1e435d is described below

commit d80c1e435d6cdb0469838e541a7acea72c5e4062
Author: tsreaper <[email protected]>
AuthorDate: Sun May 24 10:56:57 2026 +0800

    [flink] Restart dedicated compact job if schema change is detected (#6124)
    
    Currently, streaming compact job does not deal with schema change. If
    the user adds a column, it will be ignored by the compact job and this
    column will be lost.
    
    This PR restarts the compact job if schema change is detected, so new
    columns will not be lost.
---
 .../paimon/table/system/CompactBucketsTable.java   | 16 ++++++-
 .../flink/procedure/CompactProcedureITCase.java    | 52 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 1 deletion(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
index 8ffae30bd7..dad19ff263 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
@@ -80,8 +80,8 @@ public class CompactBucketsTable implements DataTable, 
ReadonlyTable {
 
     private final FileStoreTable wrapped;
     private final boolean isContinuous;
-
     @Nullable private final String databaseName;
+    private final long baseSchemaId;
 
     private static final RowType ROW_TYPE =
             RowType.of(
@@ -111,6 +111,7 @@ public class CompactBucketsTable implements DataTable, 
ReadonlyTable {
         this.wrapped = wrapped;
         this.isContinuous = isContinuous;
         this.databaseName = databaseName;
+        this.baseSchemaId = wrapped.schema().id();
     }
 
     @Override
@@ -260,6 +261,19 @@ public class CompactBucketsTable implements DataTable, 
ReadonlyTable {
             }
 
             DataSplit dataSplit = (DataSplit) split;
+            // in case of schema evolution
+            for (DataFileMeta file : dataSplit.dataFiles()) {
+                System.out.println(
+                        "File schema id " + file.schemaId() + ", base schema 
id " + baseSchemaId);
+                if (file.schemaId() > baseSchemaId) {
+                    throw new RuntimeException(
+                            String.format(
+                                    "File %s has schema id %d, "
+                                            + "which is larger than the base 
schema id %d. "
+                                            + "Trying to restart the job.",
+                                    file.fileName(), file.schemaId(), 
baseSchemaId));
+                }
+            }
 
             List<DataFileMeta> files = Collections.emptyList();
             if (isContinuous) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
index 33b9aaf818..afe45cef66 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CompactProcedureITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.flink.action.CompactAction;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
@@ -29,19 +30,24 @@ import org.apache.paimon.utils.BlockingIterator;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.StringUtils;
 
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** IT Case for {@link CompactProcedure}. */
 public class CompactProcedureITCase extends CatalogITCaseBase {
@@ -423,6 +429,52 @@ public class CompactProcedureITCase extends 
CatalogITCaseBase {
                         "The full compact strategy is only supported in batch 
mode. Please add -Dexecution.runtime-mode=BATCH.");
     }
 
+    // ----------------------- Schema Evolution -----------------------
+
+    @Test
+    public void testFailWhenSchemaChanged() throws Exception {
+        tEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  pt INT,\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'bucket' = '1',\n"
+                        + "  'deletion-vectors.enabled' = 'true',\n"
+                        + "  'write-only' = 'true'\n"
+                        + ")");
+        sql("INSERT INTO T VALUES (1, 0, 0), (2, 0, 0)");
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder()
+                        .parallelism(1)
+                        .checkpointIntervalMs(100)
+                        .build();
+        Map<String, String> catalogOptions = new HashMap<>();
+        catalogOptions.put("warehouse", path);
+        CompactAction action = new CompactAction("default", "T", 
catalogOptions, new HashMap<>());
+        action.withStreamExecutionEnvironment(env);
+        action.build();
+        JobClient jobClient = env.executeAsync();
+        sqlAssertWithRetry(
+                "SELECT * FROM T",
+                result -> result.containsExactlyInAnyOrder(Row.of(1, 0, 0), 
Row.of(2, 0, 0)));
+
+        sql("ALTER TABLE T ADD v2 INT");
+        sql("INSERT INTO T VALUES (1, 0, 10, 10), (2, 2, 20, 20)");
+        // compact job should fail with new schema
+        assertThatThrownBy(() -> jobClient.getJobExecutionResult().get())
+                .rootCause()
+                .hasMessageContaining("has schema id");
+
+        sql("CALL sys.compact(`table` => 'default.T')");
+        sqlAssertWithRetry(
+                "SELECT * FROM T",
+                result ->
+                        result.containsExactlyInAnyOrder(
+                                Row.of(1, 0, 10, 10), Row.of(2, 0, 0, null), 
Row.of(2, 2, 20, 20)));
+    }
+
     private void checkLatestSnapshot(
             FileStoreTable table, long snapshotId, Snapshot.CommitKind 
commitKind) {
         SnapshotManager snapshotManager = table.snapshotManager();

Reply via email to