This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ba967e199b [flink] Support blob compaction for data evolution tables 
(#7932)
ba967e199b is described below

commit ba967e199b40750d397570a8ba89485106956876
Author: YeJunHao <[email protected]>
AuthorDate: Fri May 22 19:25:54 2026 +0800

    [flink] Support blob compaction for data evolution tables (#7932)
    
    Enable Flink data-evolution compaction to plan and execute blob
    compaction tasks.
    
    This implements `DataEvolutionCompactTask` support for `blobTask` by
    rewriting dedicated blob files with `MultipleBlobFileWriter`. It also
    enables the Flink data-evolution compact source to plan blob tasks.
    
    The reader path now also supports blob-only merge groups, which is
    required when a data-evolution blob task contains multiple blob fields
    without the corresponding normal data file in the same split.
---
 docs/generated/core_configuration.html             |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  11 +
 .../DataEvolutionCompactCoordinator.java           | 135 +++++++++++-
 .../dataevolution/DataEvolutionCompactTask.java    | 231 ++++++++++++++++++-
 .../paimon/append/MultipleBlobTableTest.java       |  89 +++++++-
 .../DataEvolutionCompactCoordinatorTest.java       | 244 ++++++++++++++++++++-
 .../source/DataEvolutionTableCompactSource.java    |   3 +-
 .../org/apache/paimon/flink/BlobTableITCase.java   | 116 +++++++++-
 8 files changed, 816 insertions(+), 19 deletions(-)

diff --git a/docs/generated/core_configuration.html 
b/docs/generated/core_configuration.html
index c11941ea27..5d608dba9e 100644
--- a/docs/generated/core_configuration.html
+++ b/docs/generated/core_configuration.html
@@ -434,6 +434,12 @@
             <td>Duration</td>
             <td>The TTL in rocksdb index for cross partition upsert (primary 
keys not contain all partition fields), this can avoid maintaining too many 
indexes and lead to worse and worse performance, but please note that this may 
also cause data duplication.</td>
         </tr>
+        <tr>
+            <td><h5>blob-compaction.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to compact blob files when compacting a data evolution 
table.</td>
+        </tr>
         <tr>
             <td><h5>data-evolution.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index d5a1bb7fb0..f11a1dd0d5 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2224,6 +2224,13 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether enable data evolution for row 
tracking table.");
 
+    public static final ConfigOption<Boolean> BLOB_COMPACTION_ENABLED =
+            key("blob-compaction.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to compact blob files when compacting a 
data evolution table.");
+
     public static final ConfigOption<Boolean> SNAPSHOT_IGNORE_EMPTY_COMMIT =
             key("snapshot.ignore-empty-commit")
                     .booleanType()
@@ -3690,6 +3697,10 @@ public class CoreOptions implements Serializable {
         return options.get(DATA_EVOLUTION_ENABLED);
     }
 
+    public boolean blobCompactionEnabled() {
+        return options.get(BLOB_COMPACTION_ENABLED);
+    }
+
     public boolean prepareCommitWaitCompaction() {
         if (!needLookup()) {
             return false;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index 7f70bed784..837b0c3718 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.append.dataevolution;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
@@ -30,6 +31,8 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Range;
 import org.apache.paimon.utils.RangeHelper;
 
@@ -43,10 +46,15 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.function.LongFunction;
+import java.util.stream.Collectors;
 
+import static java.util.Comparator.comparingLong;
 import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
 import static org.apache.paimon.manifest.ManifestFileMeta.allContainsRowId;
+import static org.apache.paimon.types.DataTypeRoot.BLOB;
 import static org.apache.paimon.types.VectorType.isVectorStoreFile;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -54,6 +62,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 public class DataEvolutionCompactCoordinator {
 
     private static final int FILES_BATCH = 100_000;
+    private static final int BLOB_COMPACT_MIN_FILE_NUM = 2;
 
     private final CompactScanner scanner;
     private final CompactPlanner planner;
@@ -72,6 +81,7 @@ public class DataEvolutionCompactCoordinator {
         long targetFileSize = options.targetFileSize(false);
         long openFileCost = options.splitOpenFileCost();
         long compactMinFileNum = options.compactionMinFileNum();
+        Set<String> blobInlineFields = options.blobInlineField();
 
         this.scanner =
                 new CompactScanner(
@@ -82,8 +92,20 @@ public class DataEvolutionCompactCoordinator {
                         compactBlob,
                         compactVector,
                         targetFileSize,
+                        options.blobTargetFileSize(),
                         openFileCost,
-                        compactMinFileNum);
+                        compactMinFileNum,
+                        schemaId -> 
table.schemaManager().schema(schemaId).logicalRowType(),
+                        compactBlob
+                                ? table.rowType().getFields().stream()
+                                        .filter(
+                                                field ->
+                                                        field.type().is(BLOB)
+                                                                && 
!blobInlineFields.contains(
+                                                                        
field.name()))
+                                        .map(DataField::id)
+                                        .collect(Collectors.toSet())
+                                : null);
     }
 
     public List<DataEvolutionCompactTask> plan() {
@@ -141,20 +163,52 @@ public class DataEvolutionCompactCoordinator {
         private final boolean compactBlob;
         private final boolean compactVector;
         private final long targetFileSize;
+        private final long blobTargetFileSize;
         private final long openFileCost;
         private final long compactMinFileNum;
+        private final LongFunction<RowType> schemaFetcher;
+        @Nullable private final Set<Integer> currentBlobFieldIds;
 
+        @VisibleForTesting
         CompactPlanner(
                 boolean compactBlob,
                 boolean compactVector,
                 long targetFileSize,
                 long openFileCost,
                 long compactMinFileNum) {
+            this(
+                    compactBlob,
+                    compactVector,
+                    targetFileSize,
+                    targetFileSize,
+                    openFileCost,
+                    compactMinFileNum,
+                    schemaId -> {
+                        throw new IllegalStateException(
+                                "Schema fetcher is required for blob 
compaction.");
+                    },
+                    null);
+        }
+
+        CompactPlanner(
+                boolean compactBlob,
+                boolean compactVector,
+                long targetFileSize,
+                long blobTargetFileSize,
+                long openFileCost,
+                long compactMinFileNum,
+                LongFunction<RowType> schemaFetcher,
+                @Nullable Set<Integer> currentBlobFieldIds) {
             this.compactBlob = compactBlob;
             this.compactVector = compactVector;
             this.targetFileSize = targetFileSize;
+            this.blobTargetFileSize = blobTargetFileSize;
             this.openFileCost = openFileCost;
             this.compactMinFileNum = compactMinFileNum;
+            Map<Long, RowType> schemaCache = new HashMap<>();
+            this.schemaFetcher =
+                    schemaId -> schemaCache.computeIfAbsent(schemaId, 
schemaFetcher::apply);
+            this.currentBlobFieldIds = currentBlobFieldIds;
         }
 
         List<DataEvolutionCompactTask> compactPlan(List<ManifestEntry> input) {
@@ -312,8 +366,8 @@ public class DataEvolutionCompactCoordinator {
                     blobFiles.addAll(
                             dataFileToBlobFiles.getOrDefault(dataFile, 
Collections.emptyList()));
                 }
-                if (blobFiles.size() >= compactMinFileNum) {
-                    tasks.add(new DataEvolutionCompactTask(partition, 
blobFiles, true));
+                for (List<DataFileMeta> blobFilesToCompact : 
blobFileGroupsToCompact(blobFiles)) {
+                    tasks.add(new DataEvolutionCompactTask(partition, 
blobFilesToCompact, true));
                 }
             }
 
@@ -330,5 +384,80 @@ public class DataEvolutionCompactCoordinator {
             }
             return tasks;
         }
+
+        private List<List<DataFileMeta>> 
blobFileGroupsToCompact(List<DataFileMeta> blobFiles) {
+            Map<Integer, List<DataFileMeta>> fieldIdToFiles = new 
LinkedHashMap<>();
+            for (DataFileMeta blobFile : blobFiles) {
+                int fieldId = blobFieldId(blobFile);
+                if (currentBlobFieldIds == null || 
currentBlobFieldIds.contains(fieldId)) {
+                    fieldIdToFiles.computeIfAbsent(fieldId, key -> new 
ArrayList<>()).add(blobFile);
+                }
+            }
+
+            List<List<DataFileMeta>> result = new ArrayList<>();
+            for (List<DataFileMeta> files : fieldIdToFiles.values()) {
+                result.addAll(fileGroupsToCompact(files));
+            }
+            return result;
+        }
+
+        private List<List<DataFileMeta>> 
fileGroupsToCompact(List<DataFileMeta> files) {
+            List<List<DataFileMeta>> result = new ArrayList<>();
+            List<DataFileMeta> sortedFiles = new ArrayList<>(files);
+            sortedFiles.sort(comparingLong(DataFileMeta::nonNullFirstRowId));
+
+            List<DataFileMeta> continuousFiles = new ArrayList<>();
+            long expectedFirstRowId = -1;
+            for (DataFileMeta file : sortedFiles) {
+                if (file.fileSize() >= blobTargetFileSize) {
+                    addFileGroupsToCompact(result, continuousFiles);
+                    continuousFiles.clear();
+                    expectedFirstRowId = -1;
+                    continue;
+                }
+
+                long firstRowId = file.nonNullFirstRowId();
+                if (!continuousFiles.isEmpty() && firstRowId != 
expectedFirstRowId) {
+                    addFileGroupsToCompact(result, continuousFiles);
+                    continuousFiles.clear();
+                }
+                continuousFiles.add(file);
+                expectedFirstRowId = firstRowId + file.rowCount();
+            }
+            addFileGroupsToCompact(result, continuousFiles);
+            return result;
+        }
+
+        private void addFileGroupsToCompact(
+                List<List<DataFileMeta>> result, List<DataFileMeta> 
continuousFiles) {
+            if (continuousFiles.size() < BLOB_COMPACT_MIN_FILE_NUM) {
+                return;
+            }
+            List<DataFileMeta> taskFiles = new ArrayList<>();
+            long fileSize = 0L;
+            for (DataFileMeta file : continuousFiles) {
+                taskFiles.add(file);
+                fileSize += file.fileSize();
+                if (fileSize >= blobTargetFileSize
+                        && taskFiles.size() >= BLOB_COMPACT_MIN_FILE_NUM) {
+                    result.add(taskFiles);
+                    taskFiles = new ArrayList<>();
+                    fileSize = 0L;
+                }
+            }
+
+            if (taskFiles.size() >= BLOB_COMPACT_MIN_FILE_NUM) {
+                result.add(taskFiles);
+            }
+        }
+
+        private int blobFieldId(DataFileMeta blobFile) {
+            checkArgument(
+                    blobFile.writeCols() != null && 
blobFile.writeCols().size() == 1,
+                    "Blob file %s should contain exactly one write column.",
+                    blobFile);
+            RowType rowType = schemaFetcher.apply(blobFile.schemaId());
+            return rowType.getField(blobFile.writeCols().get(0)).id();
+        }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
index 9dcec330e7..e56fe32f82 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
@@ -23,30 +23,44 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.append.AppendCompactTask;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fileindex.FileIndexOptions;
+import org.apache.paimon.format.blob.BlobFileFormat;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.FileWriter;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.io.RowDataFileWriter;
+import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.operation.AppendFileStoreWrite;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
+import org.apache.paimon.statistics.SimpleColStatsCollector;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.LongCounter;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SetUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static java.util.Comparator.comparingLong;
 import static org.apache.paimon.types.BlobType.fieldNamesInBlobFile;
 import static org.apache.paimon.types.VectorType.fieldNamesInVectorFile;
 import static org.apache.paimon.types.VectorType.isVectorStoreFile;
@@ -57,8 +71,16 @@ public class DataEvolutionCompactTask extends 
AppendCompactTask {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DataEvolutionCompactTask.class);
 
-    private static final Map<String, String> DYNAMIC_WRITE_OPTIONS =
-            Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), 
"99999 G");
+    private static final Map<String, String> DYNAMIC_WRITE_OPTIONS = 
dynamicWriteOptions();
+    private static final Map<String, String> BLOB_COMPACT_READ_OPTIONS =
+            Collections.singletonMap(CoreOptions.BLOB_AS_DESCRIPTOR.key(), 
"true");
+
+    private static Map<String, String> dynamicWriteOptions() {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G");
+        options.put(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "99999 G");
+        return Collections.unmodifiableMap(options);
+    }
 
     private final boolean blobTask;
 
@@ -74,8 +96,7 @@ public class DataEvolutionCompactTask extends 
AppendCompactTask {
 
     public CommitMessage doCompact(FileStoreTable table, String commitUser) 
throws Exception {
         if (blobTask) {
-            // TODO: support blob file compaction
-            throw new UnsupportedOperationException("Blob task is not 
supported");
+            return doCompactBlobFiles(table, commitUser);
         }
         if (isVectorStoreFile(compactBefore.get(0).fileName())) {
             // TODO: support vector-store file compaction
@@ -165,6 +186,208 @@ public class DataEvolutionCompactTask extends 
AppendCompactTask {
                 partition, 0, null, DataIncrement.emptyIncrement(), 
compactIncrement);
     }
 
+    private CommitMessage doCompactBlobFiles(FileStoreTable table, String 
commitUser)
+            throws Exception {
+        CoreOptions options = table.coreOptions();
+        List<DataFileMeta> sortedCompactBefore = 
sortedByFirstRowId(compactBefore);
+        DataField blobField = blobField(table, options, sortedCompactBefore);
+        checkRowIdsContinuous(sortedCompactBefore);
+        checkArgument(
+                sortedCompactBefore.size() > 1,
+                "Blob compaction task %s should contain at least two files to 
compact.",
+                this);
+
+        RowType blobWriteType = new 
RowType(Collections.singletonList(blobField));
+
+        FileStoreTable readTable = table.copy(BLOB_COMPACT_READ_OPTIONS);
+        AppendOnlyFileStore store = (AppendOnlyFileStore) readTable.store();
+        DataFilePathFactory pathFactory =
+                store.pathFactory().createDataFilePathFactory(partition, 0);
+
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withPartition(partition)
+                        .withBucket(0)
+                        .withDataFiles(sortedCompactBefore)
+                        .withBucketPath(pathFactory.parent().toString())
+                        .rawConvertible(false)
+                        .build();
+        RecordReader<InternalRow> reader =
+                
store.newDataEvolutionRead().withReadType(blobWriteType).createReader(dataSplit);
+        FileWriter<InternalRow, DataFileMeta> writer =
+                createBlobFileWriter(table, options, blobWriteType, 
blobField.name(), pathFactory);
+
+        try {
+            reader.forEachRemaining(
+                    row -> {
+                        try {
+                            writer.write(row);
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+            writer.close();
+        } catch (Exception e) {
+            writer.abort();
+            throw e;
+        }
+
+        long firstRowId = sortedCompactBefore.get(0).nonNullFirstRowId();
+        long minSequenceId = minSequenceId(sortedCompactBefore);
+        long maxSequenceId = maxSequenceId(sortedCompactBefore);
+        DataFileMeta compactedFile =
+                writer.result()
+                        .assignFirstRowId(firstRowId)
+                        .assignSequenceNumber(minSequenceId, maxSequenceId);
+        compactAfter.add(compactedFile);
+        checkArgument(compactAfter.size() == 1, "Blob file compaction should 
produce one file.");
+        checkSameRowRange(sortedCompactBefore, compactAfter);
+
+        CompactIncrement compactIncrement =
+                new CompactIncrement(
+                        sortedCompactBefore,
+                        compactAfter,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyList());
+        return new CommitMessageImpl(
+                partition, 0, null, DataIncrement.emptyIncrement(), 
compactIncrement);
+    }
+
+    private FileWriter<InternalRow, DataFileMeta> createBlobFileWriter(
+            FileStoreTable table,
+            CoreOptions options,
+            RowType blobWriteType,
+            String blobFieldName,
+            DataFilePathFactory pathFactory) {
+        BlobFileFormat blobFileFormat = new BlobFileFormat();
+        return new RowDataFileWriter(
+                table.fileIO(),
+                RollingFileWriter.createFileWriterContext(
+                        blobFileFormat,
+                        blobWriteType,
+                        new SimpleColStatsCollector.Factory[] 
{NoneSimpleColStatsCollector::new},
+                        "none"),
+                pathFactory.newBlobPath(),
+                blobWriteType,
+                table.schema().id(),
+                () -> new LongCounter(0),
+                new FileIndexOptions(),
+                FileSource.COMPACT,
+                false,
+                options.statsDenseStore(),
+                pathFactory.isExternalPath(),
+                Collections.singletonList(blobFieldName));
+    }
+
+    private List<DataFileMeta> sortedByFirstRowId(List<DataFileMeta> files) {
+        List<DataFileMeta> sorted = new ArrayList<>(files);
+        sorted.sort(comparingLong(DataFileMeta::nonNullFirstRowId));
+        return sorted;
+    }
+
+    private DataField blobField(
+            FileStoreTable table, CoreOptions options, List<DataFileMeta> 
files) {
+        Integer blobFieldId = null;
+        Map<Long, RowType> schemaCache = new HashMap<>();
+        for (DataFileMeta file : files) {
+            checkArgument(
+                    file.writeCols() != null && file.writeCols().size() == 1,
+                    "Blob file %s should contain exactly one write column.",
+                    file);
+            RowType fileRowType =
+                    schemaCache.computeIfAbsent(
+                            file.schemaId(),
+                            schemaId -> 
table.schemaManager().schema(schemaId).logicalRowType());
+            int currentFieldId = 
fileRowType.getField(file.writeCols().get(0)).id();
+            if (blobFieldId == null) {
+                blobFieldId = currentFieldId;
+            } else {
+                checkArgument(
+                        blobFieldId == currentFieldId,
+                        "Blob compact before files %s should contain the same 
field.",
+                        files);
+            }
+        }
+
+        checkArgument(blobFieldId != null, "Blob compaction task should not be 
empty.");
+        checkArgument(
+                table.rowType().containsField(blobFieldId),
+                "Cannot find blob field id %s in latest schema for compaction 
task %s.",
+                blobFieldId,
+                this);
+        DataField field = table.rowType().getField(blobFieldId);
+        Set<String> blobFieldNames =
+                fieldNamesInBlobFile(table.rowType(), 
options.blobInlineField());
+        checkArgument(
+                blobFieldNames.contains(field.name()),
+                "Field %s in latest schema is not a blob file field.",
+                field.name());
+        return field;
+    }
+
+    private void checkRowIdsContinuous(List<DataFileMeta> files) {
+        checkArgument(!files.isEmpty(), "%s should not be empty.", "Blob 
compact before files");
+        long expectedFirstRowId = files.get(0).nonNullFirstRowId();
+        for (DataFileMeta file : files) {
+            long firstRowId = file.nonNullFirstRowId();
+            checkArgument(
+                    firstRowId == expectedFirstRowId,
+                    "%s should be continuous and sorted by row id, expected %s 
but got %s in file %s.",
+                    "Blob compact before files",
+                    expectedFirstRowId,
+                    firstRowId,
+                    file);
+            expectedFirstRowId += file.rowCount();
+        }
+    }
+
+    private void checkSameRowRange(
+            List<DataFileMeta> compactBefore, List<DataFileMeta> compactAfter) 
{
+        checkArgument(
+                !compactBefore.isEmpty(),
+                "%s compact before files should not be empty.",
+                "Blob compact files");
+        checkArgument(
+                !compactAfter.isEmpty(),
+                "%s compact after files should not be empty.",
+                "Blob compact files");
+        long beforeFirstRowId = compactBefore.get(0).nonNullFirstRowId();
+        long afterFirstRowId = compactAfter.get(0).nonNullFirstRowId();
+        long beforeRowCount = 
compactBefore.stream().mapToLong(DataFileMeta::rowCount).sum();
+        long afterRowCount = 
compactAfter.stream().mapToLong(DataFileMeta::rowCount).sum();
+        checkArgument(
+                beforeFirstRowId == afterFirstRowId && beforeRowCount == 
afterRowCount,
+                "%s compact after files should have the same row range as 
compact before files, "
+                        + "before first row id is %s with row count %s, "
+                        + "but after first row id is %s with row count %s.",
+                "Blob compact files",
+                beforeFirstRowId,
+                beforeRowCount,
+                afterFirstRowId,
+                afterRowCount);
+    }
+
+    private long minSequenceId(List<DataFileMeta> files) {
+        return files.stream()
+                .mapToLong(DataFileMeta::minSequenceNumber)
+                .min()
+                .orElseThrow(
+                        () ->
+                                new IllegalStateException(
+                                        "Cannot get min sequence id from 
compact before files."));
+    }
+
+    private long maxSequenceId(List<DataFileMeta> files) {
+        return files.stream()
+                .mapToLong(DataFileMeta::maxSequenceNumber)
+                .max()
+                .orElseThrow(
+                        () ->
+                                new IllegalStateException(
+                                        "Cannot get max sequence id from 
compact before files."));
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(partition, compactBefore, compactAfter, blobTask);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
index 4b6d1c579a..7b5758909f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/MultipleBlobTableTest.java
@@ -19,6 +19,8 @@
 package org.apache.paimon.append;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.BlobData;
 import org.apache.paimon.data.GenericRow;
@@ -27,17 +29,22 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.DataEvolutionSplitRead.FieldBunch;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
 import static 
org.apache.paimon.operation.DataEvolutionSplitRead.splitFieldBunches;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
@@ -81,13 +88,93 @@ public class MultipleBlobTableTest extends TableTestBase {
         assertThat(integer.get()).isEqualTo(1000);
     }
 
+    @Test
+    public void testDataEvolutionBlobCompaction() throws Exception {
+        createTableDefault();
+
+        commitDefault(writeDataDefault(50, 20));
+
+        FileStoreTable table = getTableDefault();
+        List<DataFileMeta> before = currentDataFiles(table);
+        long beforeBlobFileCount =
+                before.stream().filter(file -> 
isBlobFile(file.fileName())).count();
+        assertThat(beforeBlobFileCount).isEqualTo(40);
+
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, true, false);
+        List<DataEvolutionCompactTask> tasks = coordinator.plan();
+        
assertThat(tasks.stream().anyMatch(DataEvolutionCompactTask::isBlobTask)).isTrue();
+
+        List<CommitMessage> compactMessages = new ArrayList<>();
+        for (DataEvolutionCompactTask task : tasks) {
+            compactMessages.add(task.doCompact(table, commitUser));
+        }
+        commitDefault(compactMessages);
+
+        List<DataFileMeta> after = currentDataFiles(table);
+        long afterBlobFileCount =
+                after.stream().filter(file -> 
isBlobFile(file.fileName())).count();
+        assertThat(afterBlobFileCount).isLessThan(beforeBlobFileCount);
+        coordinator = new DataEvolutionCompactCoordinator(table, true, false);
+        
assertThat(coordinator.plan().stream().anyMatch(DataEvolutionCompactTask::isBlobTask))
+                .isFalse();
+
+        AtomicInteger integer = new AtomicInteger(0);
+        readDefault(
+                row -> {
+                    integer.incrementAndGet();
+                    if (integer.get() % 50 == 0) {
+                        
assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes1);
+                        
assertThat(row.getBlob(3).toData()).isEqualTo(blobBytes2);
+                    }
+                });
+        assertThat(integer.get()).isEqualTo(1000);
+    }
+
+    @Test
+    public void testDataEvolutionBlobCompactionAfterDropBlobColumns() throws 
Exception {
+        createTableDefault();
+
+        commitDefault(writeDataDefault(1000, 1));
+
+        catalog.alterTable(
+                identifier(),
+                Arrays.asList(SchemaChange.dropColumn("f2"), 
SchemaChange.dropColumn("f3")),
+                false);
+
+        FileStoreTable table = getTableDefault();
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, true, false);
+        List<DataEvolutionCompactTask> tasks = coordinator.plan();
+        
assertThat(tasks.stream().anyMatch(DataEvolutionCompactTask::isBlobTask)).isFalse();
+
+        List<DataFileMeta> after = currentDataFiles(getTableDefault());
+        assertThat(after.stream().filter(file -> 
isBlobFile(file.fileName())).count())
+                .isEqualTo(20);
+
+        AtomicInteger integer = new AtomicInteger(0);
+        readDefault(
+                row -> {
+                    integer.incrementAndGet();
+                    assertThat(row.getFieldCount()).isEqualTo(2);
+                });
+        assertThat(integer.get()).isEqualTo(1000);
+    }
+
+    private List<DataFileMeta> currentDataFiles(FileStoreTable table) {
+        return table.store().newScan().plan().files().stream()
+                .map(ManifestEntry::file)
+                .collect(Collectors.toList());
+    }
+
     protected Schema schemaDefault() {
         Schema.Builder schemaBuilder = Schema.newBuilder();
         schemaBuilder.column("f0", DataTypes.INT());
         schemaBuilder.column("f1", DataTypes.STRING());
         schemaBuilder.column("f2", DataTypes.BLOB());
         schemaBuilder.column("f3", DataTypes.BLOB());
-        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+        schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "1 GB");
+        schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "25 MB");
         schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
         schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
         return schemaBuilder.build();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
index ca8a5cbb1b..4946a61539 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
@@ -36,6 +36,9 @@ import org.apache.paimon.stats.StatsTestUtils;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.junit.jupiter.api.Test;
@@ -44,7 +47,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.LongFunction;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
@@ -157,15 +165,15 @@ public class DataEvolutionCompactCoordinatorTest {
         // Test blob file compaction when enabled
         List<ManifestEntry> entries = new ArrayList<>();
         entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
-        entries.add(makeBlobEntry("file1.blob", 0L, 100L, 100));
-        entries.add(makeBlobEntry("file1b.blob", 0L, 100L, 100));
+        entries.add(makeBlobEntry("file1.blob", 0L, 50L, 100, "pic"));
+        entries.add(makeBlobEntry("file1b.blob", 50L, 50L, 100, "pic"));
         entries.add(makeEntry("file2.parquet", 100L, 100L, 100));
-        entries.add(makeBlobEntry("file2.blob", 100L, 100L, 100));
-        entries.add(makeBlobEntry("file2b.blob", 100L, 100L, 100));
+        entries.add(makeBlobEntry("file2.blob", 100L, 50L, 100, "pic"));
+        entries.add(makeBlobEntry("file2b.blob", 150L, 50L, 100, "pic"));
 
         // Use small target to trigger compaction, with blob compaction enabled
         DataEvolutionCompactCoordinator.CompactPlanner planner =
-                new DataEvolutionCompactCoordinator.CompactPlanner(true, 
false, 1024, 1024, 2);
+                blobPlanner(1024, 1024, 2, rowType(new DataField(1, "pic", 
DataTypes.BLOB())));
 
         List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
 
@@ -182,6 +190,148 @@ public class DataEvolutionCompactCoordinatorTest {
                         entries.get(5).file());
     }
 
+    @Test
+    public void testCompactPlannerSkipsSingleFilePerBlobField() {
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+        entries.add(makeBlobEntry("pic1.blob", 0L, 100L, 100, "pic1"));
+        entries.add(makeBlobEntry("pic2.blob", 0L, 100L, 100, "pic2"));
+
+        DataEvolutionCompactCoordinator.CompactPlanner planner =
+                blobPlanner(
+                        1024,
+                        1024,
+                        2,
+                        rowType(
+                                new DataField(1, "pic1", DataTypes.BLOB()),
+                                new DataField(2, "pic2", DataTypes.BLOB())));
+
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
+
+        assertThat(tasks).isEmpty();
+    }
+
+    @Test
+    public void testCompactPlannerCreatesOneBlobTaskPerField() {
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+        entries.add(makeBlobEntry("a1-1.blob", 0L, 50L, 100, "a1"));
+        entries.add(makeBlobEntry("a1-2.blob", 50L, 50L, 100, "a1"));
+        entries.add(makeBlobEntry("a2-1.blob", 0L, 30L, 100, "a2"));
+        entries.add(makeBlobEntry("a2-2.blob", 30L, 30L, 100, "a2"));
+        entries.add(makeBlobEntry("a2-3.blob", 60L, 40L, 100, "a2"));
+
+        DataEvolutionCompactCoordinator.CompactPlanner planner =
+                blobPlanner(
+                        1024,
+                        1024,
+                        2,
+                        rowType(
+                                new DataField(1, "a1", DataTypes.BLOB()),
+                                new DataField(2, "a2", DataTypes.BLOB())));
+
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
+
+        assertThat(tasks).hasSize(2);
+        assertThat(tasks).allMatch(DataEvolutionCompactTask::isBlobTask);
+        assertThat(tasks.get(0).compactBefore())
+                .containsExactly(entries.get(1).file(), entries.get(2).file());
+        assertThat(tasks.get(1).compactBefore())
+                .containsExactly(
+                        entries.get(3).file(), entries.get(4).file(), 
entries.get(5).file());
+    }
+
+    @Test
+    public void testCompactPlannerSplitsBlobFilesByTargetSizeAndRowId() {
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(makeEntry("file1.parquet", 0L, 70L, 100));
+        entries.add(makeBlobEntry("a.blob", 0L, 10L, 400, "pic"));
+        entries.add(makeBlobEntry("b.blob", 10L, 10L, 400, "pic"));
+        entries.add(makeBlobEntry("c.blob", 20L, 10L, 1024, "pic"));
+        entries.add(makeBlobEntry("d.blob", 30L, 10L, 600, "pic"));
+        entries.add(makeBlobEntry("e.blob", 40L, 10L, 600, "pic"));
+        entries.add(makeBlobEntry("f.blob", 50L, 10L, 400, "pic"));
+        entries.add(makeBlobEntry("g.blob", 60L, 10L, 500, "pic"));
+
+        DataEvolutionCompactCoordinator.CompactPlanner planner =
+                blobPlanner(1024, 1, 2, rowType(new DataField(1, "pic", 
DataTypes.BLOB())));
+
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
+
+        assertThat(tasks).hasSize(3);
+        assertThat(tasks).allMatch(DataEvolutionCompactTask::isBlobTask);
+        assertThat(tasks.get(0).compactBefore())
+                .containsExactly(entries.get(1).file(), entries.get(2).file());
+        assertThat(tasks.get(1).compactBefore())
+                .containsExactly(entries.get(4).file(), entries.get(5).file());
+        assertThat(tasks.get(2).compactBefore())
+                .containsExactly(entries.get(6).file(), entries.get(7).file());
+    }
+
+    @Test
+    public void testCompactPlannerSkipsBlobGroupsWithTooFewSmallFiles() {
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(makeEntry("file1.parquet", 0L, 200L, 100));
+        entries.add(makeBlobEntry("large-pic.blob", 0L, 100L, 1024, "pic"));
+        entries.add(makeBlobEntry("small-pic.blob", 100L, 100L, 100, "pic"));
+
+        DataEvolutionCompactCoordinator.CompactPlanner planner =
+                blobPlanner(1024, 1, 2, rowType(new DataField(1, "pic", 
DataTypes.BLOB())));
+
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
+
+        assertThat(tasks).isEmpty();
+    }
+
+    @Test
+    public void testCompactPlannerGroupsBlobFilesByFieldId() {
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(makeEntry("file1.parquet", 0L, 200L, 100));
+        entries.add(makeBlobEntry("old-pic.blob", 0L, 100L, 100, 0, 
"old_pic"));
+        entries.add(makeBlobEntry("new-pic.blob", 100L, 100L, 100, 1, 
"new_pic"));
+
+        Map<Long, RowType> schemas = new HashMap<>();
+        schemas.put(0L, rowType(new DataField(1, "old_pic", 
DataTypes.BLOB())));
+        schemas.put(1L, rowType(new DataField(1, "new_pic", 
DataTypes.BLOB())));
+        DataEvolutionCompactCoordinator.CompactPlanner planner =
+                blobPlanner(
+                        1024,
+                        1024,
+                        2,
+                        schemaId -> schemas.get(schemaId),
+                        rowType(new DataField(1, "new_pic", 
DataTypes.BLOB())));
+
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
+
+        assertThat(tasks).hasSize(1);
+        assertThat(tasks.get(0).isBlobTask()).isTrue();
+        assertThat(tasks.get(0).compactBefore())
+                .containsExactly(entries.get(1).file(), entries.get(2).file());
+    }
+
+    @Test
+    public void testCompactPlannerSeparatesBlobFilesByFieldIdWhenNameReused() {
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+        entries.add(makeBlobEntry("old-pic.blob", 0L, 100L, 100, 0, "pic"));
+        entries.add(makeBlobEntry("new-pic.blob", 0L, 100L, 100, 1, "pic"));
+
+        Map<Long, RowType> schemas = new HashMap<>();
+        schemas.put(0L, rowType(new DataField(1, "pic", DataTypes.BLOB())));
+        schemas.put(1L, rowType(new DataField(2, "pic", DataTypes.BLOB())));
+        DataEvolutionCompactCoordinator.CompactPlanner planner =
+                blobPlanner(
+                        1024,
+                        1024,
+                        2,
+                        schemaId -> schemas.get(schemaId),
+                        rowType(new DataField(2, "pic", DataTypes.BLOB())));
+
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
+
+        assertThat(tasks).isEmpty();
+    }
+
     @Test
     public void testPlanWithNullManifestRowId() {
         FileStoreTable table = mock(FileStoreTable.class);
@@ -286,6 +436,21 @@ public class DataEvolutionCompactCoordinatorTest {
 
     private ManifestEntry makeBlobEntry(
             String fileName, long firstRowId, long rowCount, long fileSize) {
+        return makeBlobEntry(fileName, firstRowId, rowCount, fileSize, null);
+    }
+
+    private ManifestEntry makeBlobEntry(
+            String fileName, long firstRowId, long rowCount, long fileSize, 
String writeCol) {
+        return makeBlobEntry(fileName, firstRowId, rowCount, fileSize, 0, 
writeCol);
+    }
+
+    private ManifestEntry makeBlobEntry(
+            String fileName,
+            long firstRowId,
+            long rowCount,
+            long fileSize,
+            long schemaId,
+            String writeCol) {
         // Blob files have .blob extension
         String blobFileName = fileName.endsWith(".blob") ? fileName : fileName 
+ ".blob";
         return ManifestEntry.create(
@@ -293,11 +458,39 @@ public class DataEvolutionCompactCoordinatorTest {
                 BinaryRow.EMPTY_ROW,
                 0,
                 0,
-                createDataFileMeta(blobFileName, firstRowId, rowCount, 0, 
fileSize));
+                createDataFileMeta(
+                        blobFileName,
+                        firstRowId,
+                        rowCount,
+                        0,
+                        fileSize,
+                        schemaId,
+                        writeCol == null ? null : 
Collections.singletonList(writeCol)));
     }
 
     private DataFileMeta createDataFileMeta(
             String fileName, long firstRowId, long rowCount, long maxSeq, long 
fileSize) {
+        return createDataFileMeta(fileName, firstRowId, rowCount, maxSeq, 
fileSize, 0, null);
+    }
+
+    private DataFileMeta createDataFileMeta(
+            String fileName,
+            long firstRowId,
+            long rowCount,
+            long maxSeq,
+            long fileSize,
+            List<String> writeCols) {
+        return createDataFileMeta(fileName, firstRowId, rowCount, maxSeq, 
fileSize, 0, writeCols);
+    }
+
+    private DataFileMeta createDataFileMeta(
+            String fileName,
+            long firstRowId,
+            long rowCount,
+            long maxSeq,
+            long fileSize,
+            long schemaId,
+            List<String> writeCols) {
         return DataFileMeta.create(
                 fileName,
                 fileSize,
@@ -308,7 +501,7 @@ public class DataEvolutionCompactCoordinatorTest {
                 StatsTestUtils.newEmptySimpleStats(),
                 0,
                 maxSeq,
-                0,
+                schemaId,
                 0,
                 Collections.emptyList(),
                 Timestamp.fromEpochMillis(System.currentTimeMillis()),
@@ -318,7 +511,42 @@ public class DataEvolutionCompactCoordinatorTest {
                 null,
                 null,
                 firstRowId,
-                null);
+                writeCols);
+    }
+
+    private DataEvolutionCompactCoordinator.CompactPlanner blobPlanner(
+            long targetFileSize, long openFileCost, long compactMinFileNum, 
RowType rowType) {
+        return blobPlanner(
+                targetFileSize, openFileCost, compactMinFileNum, schemaId -> 
rowType, rowType);
+    }
+
+    private DataEvolutionCompactCoordinator.CompactPlanner blobPlanner(
+            long targetFileSize,
+            long openFileCost,
+            long compactMinFileNum,
+            LongFunction<RowType> schemaFetcher,
+            RowType currentRowType) {
+        return new DataEvolutionCompactCoordinator.CompactPlanner(
+                true,
+                false,
+                targetFileSize,
+                targetFileSize,
+                openFileCost,
+                compactMinFileNum,
+                schemaFetcher,
+                fieldIds(currentRowType));
+    }
+
+    private RowType rowType(DataField... fields) {
+        return new RowType(Arrays.asList(fields));
+    }
+
+    private Set<Integer> fieldIds(RowType rowType) {
+        Set<Integer> fieldIds = new HashSet<>();
+        for (DataField field : rowType.getFields()) {
+            fieldIds.add(field.id());
+        }
+        return fieldIds;
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataEvolutionTableCompactSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataEvolutionTableCompactSource.java
index f712c471e4..24e768cc40 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataEvolutionTableCompactSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataEvolutionTableCompactSource.java
@@ -80,7 +80,8 @@ public class DataEvolutionTableCompactSource
 
         public CompactSourceReader(FileStoreTable table, PartitionPredicate 
partitions) {
             compactionCoordinator =
-                    new DataEvolutionCompactCoordinator(table, partitions, 
false, false);
+                    new DataEvolutionCompactCoordinator(
+                            table, partitions, 
table.coreOptions().blobCompactionEnabled(), false);
         }
 
         @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
index 1b56bdd74d..b2238ce04c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -59,9 +59,9 @@ public class BlobTableITCase extends CatalogITCaseBase {
     protected List<String> ddl() {
         String externalStoragePath = 
warehouse.resolve("external-storage-blob-path").toString();
         return Arrays.asList(
-                "CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING, 
picture BYTES) WITH ('row-tracking.enabled'='true', 
'data-evolution.enabled'='true', 'blob-field'='picture')",
+                "CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING, 
picture BYTES) WITH ('row-tracking.enabled'='true', 
'data-evolution.enabled'='true', 'blob-compaction.enabled'='true', 
'blob-field'='picture')",
                 "CREATE TABLE IF NOT EXISTS blob_table_descriptor (id INT, 
data STRING, picture BYTES) WITH ('row-tracking.enabled'='true', 
'data-evolution.enabled'='true', 'blob-field'='picture', 
'blob-as-descriptor'='true')",
-                "CREATE TABLE IF NOT EXISTS multiple_blob_table (id INT, data 
STRING, pic1 BYTES, pic2 BYTES) WITH ('row-tracking.enabled'='true', 
'data-evolution.enabled'='true', 'blob-field'='pic1,pic2')",
+                "CREATE TABLE IF NOT EXISTS multiple_blob_table (id INT, data 
STRING, pic1 BYTES, pic2 BYTES) WITH ('row-tracking.enabled'='true', 
'data-evolution.enabled'='true', 'blob-compaction.enabled'='true', 
'blob-field'='pic1,pic2')",
                 String.format(
                         "CREATE TABLE IF NOT EXISTS copy_blob_table (id INT, 
data STRING, picture BYTES)"
                                 + " WITH ('row-tracking.enabled'='true', 
'data-evolution.enabled'='true',"
@@ -108,6 +108,118 @@ public class BlobTableITCase extends CatalogITCaseBase {
                 .isEqualTo(3);
     }
 
+    @Test
+    public void testBlobCompaction() throws Exception {
+        for (int i = 1; i <= 10; i++) {
+            batchSql("INSERT INTO blob_table VALUES (%s, 'paimon', 
X'48656C6C6F')", i);
+        }
+        batchSql("INSERT INTO blob_table VALUES (1, 'paimon', X'48656C6C6F')");
+
+        assertThat(batchSql("SELECT COUNT(*) FROM `blob_table$files`"))
+                .containsExactly(Row.of(22L));
+        assertThat(
+                        batchSql(
+                                "SELECT COUNT(*) FROM `blob_table$files` WHERE 
file_path LIKE '%%.blob'"))
+                .containsExactly(Row.of(11L));
+        assertThat(
+                        batchSql(
+                                "SELECT COUNT(*) FROM `blob_table$files` WHERE 
file_path NOT LIKE '%%.blob'"))
+                .containsExactly(Row.of(11L));
+
+        tEnv.getConfig().set("table.dml-sync", "true");
+        tEnv.executeSql("CALL sys.compact(`table` => 
'default.blob_table')").await();
+
+        assertThat(batchSql("SELECT COUNT(*) FROM 
`blob_table$files`")).containsExactly(Row.of(2L));
+        assertThat(
+                        batchSql(
+                                "SELECT COUNT(*) FROM `blob_table$files` WHERE 
file_path LIKE '%%.blob'"))
+                .containsExactly(Row.of(1L));
+        assertThat(
+                        batchSql(
+                                "SELECT COUNT(*) FROM `blob_table$files` WHERE 
file_path NOT LIKE '%%.blob'"))
+                .containsExactly(Row.of(1L));
+        assertThat(batchSql("SELECT COUNT(*) FROM 
blob_table")).containsExactly(Row.of(11L));
+        assertThat(batchSql("SELECT picture FROM blob_table WHERE id = 1"))
+                .containsExactlyInAnyOrder(
+                        Row.of(new byte[] {72, 101, 108, 108, 111}),
+                        Row.of(new byte[] {72, 101, 108, 108, 111}));
+    }
+
+    @Test
+    public void testMultipleBlobCompaction() throws Exception {
+        for (int i = 1; i <= 10; i++) {
+            batchSql(
+                    "INSERT INTO multiple_blob_table VALUES (%s, 'paimon', 
X'48656C6C6F', X'5945')",
+                    i);
+        }
+        batchSql("INSERT INTO multiple_blob_table VALUES (1, 'paimon', 
X'48656C6C6F', X'5945')");
+
+        assertThat(batchSql("SELECT COUNT(*) FROM 
`multiple_blob_table$files`"))
+                .containsExactly(Row.of(33L));
+        assertThat(
+                        batchSql(
+                                "SELECT COUNT(*) FROM 
`multiple_blob_table$files` WHERE file_path LIKE '%%.blob'"))
+                .containsExactly(Row.of(22L));
+        assertThat(
+                        batchSql(
+                                "SELECT COUNT(*) FROM 
`multiple_blob_table$files` WHERE file_path NOT LIKE '%%.blob'"))
+                .containsExactly(Row.of(11L));
+
+        tEnv.getConfig().set("table.dml-sync", "true");
+        tEnv.executeSql("CALL sys.compact(`table` => 
'default.multiple_blob_table')").await();
+
+        assertThat(batchSql("SELECT COUNT(*) FROM 
`multiple_blob_table$files`"))
+                .containsExactly(Row.of(3L));
+        assertThat(
+                        batchSql(
+                                "SELECT COUNT(*) FROM 
`multiple_blob_table$files` WHERE file_path LIKE '%%.blob'"))
+                .containsExactly(Row.of(2L));
+        assertThat(
+                        batchSql(
+                                "SELECT COUNT(*) FROM 
`multiple_blob_table$files` WHERE file_path NOT LIKE '%%.blob'"))
+                .containsExactly(Row.of(1L));
+        assertThat(batchSql("SELECT COUNT(*) FROM multiple_blob_table"))
+                .containsExactly(Row.of(11L));
+        assertThat(batchSql("SELECT pic1, pic2 FROM multiple_blob_table WHERE 
id = 1"))
+                .containsExactlyInAnyOrder(
+                        Row.of(new byte[] {72, 101, 108, 108, 111}, new byte[] 
{89, 69}),
+                        Row.of(new byte[] {72, 101, 108, 108, 111}, new byte[] 
{89, 69}));
+    }
+
+    @Test
+    public void testBlobCompactionDisabledByDefault() throws Exception {
+        tEnv.executeSql(
+                "CREATE TABLE blob_compaction_disabled (id INT, data STRING, 
picture BYTES) "
+                        + "WITH ('row-tracking.enabled'='true', "
+                        + "'data-evolution.enabled'='true', "
+                        + "'blob-field'='picture')");
+
+        for (int i = 1; i <= 10; i++) {
+            batchSql(
+                    "INSERT INTO blob_compaction_disabled VALUES (%s, 
'paimon', X'48656C6C6F')", i);
+        }
+        batchSql("INSERT INTO blob_compaction_disabled VALUES (1, 'paimon', 
X'48656C6C6F')");
+
+        assertThat(
+                        batchSql(
+                                "SELECT COUNT(*) FROM 
`blob_compaction_disabled$files` WHERE file_path LIKE '%%.blob'"))
+                .containsExactly(Row.of(11L));
+
+        tEnv.getConfig().set("table.dml-sync", "true");
+        tEnv.executeSql("CALL sys.compact(`table` => 
'default.blob_compaction_disabled')").await();
+
+        assertThat(
+                        batchSql(
+                                "SELECT COUNT(*) FROM 
`blob_compaction_disabled$files` WHERE file_path LIKE '%%.blob'"))
+                .containsExactly(Row.of(11L));
+        assertThat(
+                        batchSql(
+                                "SELECT COUNT(*) FROM 
`blob_compaction_disabled$files` WHERE file_path NOT LIKE '%%.blob'"))
+                .containsExactly(Row.of(1L));
+        assertThat(batchSql("SELECT COUNT(*) FROM blob_compaction_disabled"))
+                .containsExactly(Row.of(11L));
+    }
+
     @Test
     public void testWriteBlobAsDescriptor() throws Exception {
         byte[] blobData = new byte[1024 * 1024];

Reply via email to