This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 01f0764ae [core] Expose `IndexFile` in `Split`. (#3226)
01f0764ae is described below

commit 01f0764aef662a7086e4476b446acf8fa965b96e
Author: YeJunHao <[email protected]>
AuthorDate: Fri Apr 26 09:44:28 2024 +0800

    [core] Expose `IndexFile` in `Split`. (#3226)
---
 .../paimon/operation/AppendOnlyFileStoreWrite.java |   5 +
 .../org/apache/paimon/table/source/DataSplit.java  | 105 ++++++++++++++++-----
 .../org/apache/paimon/table/source/IndexFile.java  |  54 +++++++++++
 .../java/org/apache/paimon/table/source/Split.java |  10 ++
 .../snapshot/IncrementalStartingScanner.java       |  87 ++++++++++++++---
 .../table/source/snapshot/SnapshotReaderImpl.java  |  37 ++------
 .../test/java/org/apache/paimon/TestFileStore.java |   2 +
 .../paimon/crosspartition/IndexBootstrapTest.java  |   1 +
 .../paimon/operation/MergeFileSplitReadTest.java   |   1 +
 .../org/apache/paimon/table/source/SplitTest.java  |   1 +
 .../table/source/snapshot/SnapshotReaderTest.java  |  87 +++++++++++++++++
 .../flink/source/align/PlaceholderSplit.java       |   4 +
 .../SortCompactActionForUnawareBucketITCase.java   |   2 +
 .../source/ContinuousFileSplitEnumeratorTest.java  |   2 +
 .../flink/source/FileSplitEnumeratorTestBase.java  |   1 +
 .../source/FileStoreSourceSplitGeneratorTest.java  |   2 +
 .../source/FileStoreSourceSplitSerializerTest.java |   2 +
 .../paimon/hive/mapred/PaimonInputSplitTest.java   |   2 +
 paimon-spark/paimon-spark-common/pom.xml           |   4 -
 .../scala/org/apache/paimon/spark/ScanHelper.scala |  14 +--
 .../commands/DeleteFromPaimonTableCommand.scala    |   5 +-
 .../paimon/spark/commands/SparkDataFileMeta.scala  |   7 +-
 .../spark/commands/UpdatePaimonTableCommand.scala  |   6 +-
 .../org/apache/paimon/spark/ScanHelperTest.scala   |   6 +-
 24 files changed, 363 insertions(+), 84 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index fc3f2a3d6..bb3074ac7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -197,6 +197,11 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
                                         .withPartition(partition)
                                         .withBucket(bucket)
                                         .withDataFiles(files)
+                                        .rawConvertible(true)
+                                        .withBucketPath(
+                                                pathFactory
+                                                        .bucketPath(partition, 
bucket)
+                                                        .toString())
                                         .build()));
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index fbd5f723e..027aa0662 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -33,12 +33,13 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.stream.Collectors;
 
+import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Input splits. Needed by most batch computation engines. */
@@ -56,7 +57,8 @@ public class DataSplit implements Split {
     private List<DataFileMeta> dataFiles;
     @Nullable private List<DeletionFile> dataDeletionFiles;
 
-    private List<RawFile> rawFiles = Collections.emptyList();
+    private boolean rawConvertible;
+    private String bucketPath;
 
     public DataSplit() {}
 
@@ -93,6 +95,14 @@ public class DataSplit implements Split {
         return isStreaming;
     }
 
+    public boolean rawConvertible() {
+        return rawConvertible;
+    }
+
+    public String getBucketPath() {
+        return bucketPath;
+    }
+
     public OptionalLong getLatestFileCreationEpochMillis() {
         return 
this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max();
     }
@@ -108,13 +118,61 @@ public class DataSplit implements Split {
 
     @Override
     public Optional<List<RawFile>> convertToRawFiles() {
-        if (rawFiles.isEmpty()) {
-            return Optional.empty();
+        if (rawConvertible) {
+            return Optional.of(
+                    dataFiles.stream()
+                            .map(f -> makeRawTableFile(bucketPath, f))
+                            .collect(Collectors.toList()));
         } else {
-            return Optional.of(rawFiles);
+            return Optional.empty();
         }
     }
 
+    private RawFile makeRawTableFile(String bucketPath, DataFileMeta meta) {
+        return new RawFile(
+                bucketPath + "/" + meta.fileName(),
+                0,
+                meta.fileSize(),
+                meta.fileFormat()
+                        .map(t -> t.toString().toLowerCase())
+                        .orElseThrow(
+                                () ->
+                                        new RuntimeException(
+                                                "Can't find format from file: "
+                                                        + bucketPath
+                                                        + "/"
+                                                        + meta.fileName())),
+                meta.schemaId(),
+                meta.rowCount());
+    }
+
+    @Override
+    @Nullable
+    public Optional<List<IndexFile>> indexFiles() {
+        List<IndexFile> indexFiles = new ArrayList<>();
+        boolean hasIndexFile = false;
+        for (DataFileMeta file : dataFiles) {
+            List<String> exFiles =
+                    file.extraFiles().stream()
+                            .filter(s -> s.endsWith(INDEX_PATH_SUFFIX))
+                            .collect(Collectors.toList());
+            if (exFiles.isEmpty()) {
+                indexFiles.add(null);
+            } else if (exFiles.size() == 1) {
+                hasIndexFile = true;
+                indexFiles.add(new IndexFile(bucketPath + "/" + 
exFiles.get(0)));
+            } else {
+                throw new RuntimeException(
+                        "Wrong number of file index for file "
+                                + file.fileName()
+                                + " index files: "
+                                + String.join(",", exFiles));
+            }
+        }
+
+        return hasIndexFile ? Optional.of(indexFiles) : Optional.empty();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -131,7 +189,8 @@ public class DataSplit implements Split {
                 && Objects.equals(dataFiles, split.dataFiles)
                 && Objects.equals(dataDeletionFiles, split.dataDeletionFiles)
                 && isStreaming == split.isStreaming
-                && Objects.equals(rawFiles, split.rawFiles);
+                && rawConvertible == split.rawConvertible
+                && Objects.equals(bucketPath, split.bucketPath);
     }
 
     @Override
@@ -144,7 +203,8 @@ public class DataSplit implements Split {
                 dataFiles,
                 dataDeletionFiles,
                 isStreaming,
-                rawFiles);
+                rawConvertible,
+                bucketPath);
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
@@ -164,7 +224,8 @@ public class DataSplit implements Split {
         this.dataFiles = other.dataFiles;
         this.dataDeletionFiles = other.dataDeletionFiles;
         this.isStreaming = other.isStreaming;
-        this.rawFiles = other.rawFiles;
+        this.rawConvertible = other.rawConvertible;
+        this.bucketPath = other.bucketPath;
     }
 
     public void serialize(DataOutputView out) throws IOException {
@@ -189,10 +250,8 @@ public class DataSplit implements Split {
 
         out.writeBoolean(isStreaming);
 
-        out.writeInt(rawFiles.size());
-        for (RawFile rawFile : rawFiles) {
-            rawFile.serialize(out);
-        }
+        out.writeBoolean(rawConvertible);
+        out.writeUTF(bucketPath);
     }
 
     public static DataSplit deserialize(DataInputView in) throws IOException {
@@ -218,12 +277,8 @@ public class DataSplit implements Split {
         List<DeletionFile> dataDeletionFiles = 
DeletionFile.deserializeList(in);
 
         boolean isStreaming = in.readBoolean();
-
-        int rawFileNum = in.readInt();
-        List<RawFile> rawFiles = new ArrayList<>();
-        for (int i = 0; i < rawFileNum; i++) {
-            rawFiles.add(RawFile.deserialize(in));
-        }
+        boolean rawConvertible = in.readBoolean();
+        String bucketPath = in.readUTF();
 
         DataSplit.Builder builder =
                 builder()
@@ -233,7 +288,9 @@ public class DataSplit implements Split {
                         .withBeforeFiles(beforeFiles)
                         .withDataFiles(dataFiles)
                         .isStreaming(isStreaming)
-                        .rawFiles(rawFiles);
+                        .rawConvertible(rawConvertible)
+                        .withBucketPath(bucketPath);
+
         if (beforeDeletionFiles != null) {
             builder.withBeforeDeletionFiles(beforeDeletionFiles);
         }
@@ -292,8 +349,13 @@ public class DataSplit implements Split {
             return this;
         }
 
-        public Builder rawFiles(List<RawFile> rawFiles) {
-            this.split.rawFiles = rawFiles;
+        public Builder rawConvertible(boolean rawConvertible) {
+            this.split.rawConvertible = rawConvertible;
+            return this;
+        }
+
+        public Builder withBucketPath(String bucketPath) {
+            this.split.bucketPath = bucketPath;
             return this;
         }
 
@@ -301,6 +363,7 @@ public class DataSplit implements Split {
             checkArgument(split.partition != null);
             checkArgument(split.bucket != -1);
             checkArgument(split.dataFiles != null);
+            checkArgument(split.bucketPath != null);
 
             DataSplit split = new DataSplit();
             split.assign(this.split);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/IndexFile.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/IndexFile.java
new file mode 100644
index 000000000..1c7814d4e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/IndexFile.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/** Index file for data file. */
+public class IndexFile {
+
+    private final String path;
+
+    public IndexFile(String path) {
+        this.path = path;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof IndexFile)) {
+            return false;
+        }
+
+        IndexFile other = (IndexFile) o;
+        return Objects.equals(path, other.path);
+    }
+
+    public void serialize(DataOutputView out) throws IOException {
+        out.writeUTF(path);
+    }
+
+    public static IndexFile deserialize(DataInputView in) throws IOException {
+        String path = in.readUTF();
+        return new IndexFile(path);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
index adefb868f..d6cb381e2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
@@ -51,4 +51,14 @@ public interface Split extends Serializable {
     default Optional<List<DeletionFile>> deletionFiles() {
         return Optional.empty();
     }
+
+    /**
+     * * Return the index file of the data file, for example, bloom-filter 
index. All the type of
+     * indexes and columns will be stored in one single index file.
+     *
+     * <p>If there is no corresponding index file, the element will be null.
+     */
+    default Optional<List<IndexFile>> indexFiles() {
+        return Optional.empty();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 602a6370a..86705ce44 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -23,17 +23,22 @@ import org.apache.paimon.Snapshot.CommitKind;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.table.source.PlanImpl;
 import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /** {@link StartingScanner} for incremental changes by snapshot. */
 public class IncrementalStartingScanner extends AbstractStartingScanner {
@@ -52,34 +57,49 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
 
     @Override
     public Result scan(SnapshotReader reader) {
-        Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new 
HashMap<>();
+        Map<SplitInfo, List<DataFileMeta>> grouped = new HashMap<>();
         for (long i = startingSnapshotId + 1; i < endingSnapshotId + 1; i++) {
             List<DataSplit> splits = readSplits(reader, 
snapshotManager.snapshot(i));
             for (DataSplit split : splits) {
                 grouped.computeIfAbsent(
-                                Pair.of(split.partition(), split.bucket()), k 
-> new ArrayList<>())
+                                new SplitInfo(
+                                        split.partition(),
+                                        split.bucket(),
+                                        // take it for false, because multiple 
snapshot read may
+                                        // need merge for primary key table
+                                        false,
+                                        split.getBucketPath(),
+                                        split.deletionFiles().orElse(null)),
+                                k -> new ArrayList<>())
                         .addAll(split.dataFiles());
             }
         }
 
-        List<DataSplit> result = new ArrayList<>();
-        for (Map.Entry<Pair<BinaryRow, Integer>, List<DataFileMeta>> entry : 
grouped.entrySet()) {
-            BinaryRow partition = entry.getKey().getLeft();
-            int bucket = entry.getKey().getRight();
+        List<Split> result = new ArrayList<>();
+        for (Map.Entry<SplitInfo, List<DataFileMeta>> entry : 
grouped.entrySet()) {
+            BinaryRow partition = entry.getKey().partition;
+            int bucket = entry.getKey().bucket;
+            boolean rawConvertible = entry.getKey().rawConvertible;
+            String bucketPath = entry.getKey().bucketPath;
+            List<DeletionFile> deletionFiles = entry.getKey().deletionFiles;
             for (SplitGenerator.SplitGroup splitGroup :
                     reader.splitGenerator().splitForBatch(entry.getValue())) {
-                // TODO pass deletion files
-                result.add(
+                DataSplit.Builder dataSplitBuilder =
                         DataSplit.builder()
                                 .withSnapshot(endingSnapshotId)
                                 .withPartition(partition)
                                 .withBucket(bucket)
                                 .withDataFiles(splitGroup.files)
-                                .build());
+                                .rawConvertible(rawConvertible)
+                                .withBucketPath(bucketPath);
+                if (deletionFiles != null) {
+                    dataSplitBuilder.withDataDeletionFiles(deletionFiles);
+                }
+                result.add(dataSplitBuilder.build());
             }
         }
 
-        return StartingScanner.fromPlan(new PlanImpl(null, endingSnapshotId, 
(List) result));
+        return StartingScanner.fromPlan(new PlanImpl(null, endingSnapshotId, 
result));
     }
 
     private List<DataSplit> readSplits(SnapshotReader reader, Snapshot s) {
@@ -110,4 +130,49 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
         }
         return (List) 
reader.withSnapshot(s).withMode(ScanMode.CHANGELOG).read().splits();
     }
+
+    /** Split information to pass. */
+    private static class SplitInfo {
+
+        private final BinaryRow partition;
+        private final int bucket;
+        private final boolean rawConvertible;
+        private final String bucketPath;
+        @Nullable private final List<DeletionFile> deletionFiles;
+
+        private SplitInfo(
+                BinaryRow partition,
+                int bucket,
+                boolean rawConvertible,
+                String bucketPath,
+                @Nullable List<DeletionFile> deletionFiles) {
+            this.partition = partition;
+            this.bucket = bucket;
+            this.rawConvertible = rawConvertible;
+            this.bucketPath = bucketPath;
+            this.deletionFiles = deletionFiles;
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(
+                    new Object[] {partition, bucket, rawConvertible, 
bucketPath, deletionFiles});
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+
+            if (!(obj instanceof SplitInfo)) {
+                return false;
+            }
+
+            SplitInfo that = (SplitInfo) obj;
+
+            return Objects.equals(partition, that.partition)
+                    && bucket == that.bucket
+                    && rawConvertible == that.rawConvertible
+                    && Objects.equals(bucketPath, that.bucketPath)
+                    && Objects.equals(deletionFiles, that.deletionFiles);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 205dfb3a1..c36427298 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -40,7 +40,6 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.table.source.PlanImpl;
-import org.apache.paimon.table.source.RawFile;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.types.RowType;
@@ -290,11 +289,10 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                                 : null;
                 for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
                     List<DataFileMeta> dataFiles = splitGroup.files;
-                    builder.withDataFiles(dataFiles);
-                    builder.rawFiles(
-                            splitGroup.rawConvertible
-                                    ? convertToRawFiles(partition, bucket, 
dataFiles)
-                                    : Collections.emptyList());
+                    String bucketPath = pathFactory.bucketPath(partition, 
bucket).toString();
+                    builder.withDataFiles(dataFiles)
+                            .rawConvertible(splitGroup.rawConvertible)
+                            .withBucketPath(bucketPath);
                     if (deletionVectors) {
                         builder.withDataDeletionFiles(
                                 getDeletionFiles(dataFiles, 
deletionIndexFile));
@@ -371,7 +369,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
                                 .withBucket(bucket)
                                 .withBeforeFiles(before)
                                 .withDataFiles(data)
-                                .isStreaming(isStreaming);
+                                .isStreaming(isStreaming)
+                                .withBucketPath(pathFactory.bucketPath(part, 
bucket).toString());
                 if (deletionVectors) {
                     IndexFileMeta beforeDeletionIndex =
                             indexFileHandler
@@ -433,28 +432,4 @@ public class SnapshotReaderImpl implements SnapshotReader {
 
         return deletionFiles;
     }
-
-    private List<RawFile> convertToRawFiles(
-            BinaryRow partition, int bucket, List<DataFileMeta> dataFiles) {
-        String bucketPath = pathFactory.bucketPath(partition, 
bucket).toString();
-        return dataFiles.stream()
-                .map(f -> makeRawTableFile(bucketPath, f))
-                .collect(Collectors.toList());
-    }
-
-    private RawFile makeRawTableFile(String bucketPath, DataFileMeta meta) {
-        return new RawFile(
-                bucketPath + "/" + meta.fileName(),
-                0,
-                meta.fileSize(),
-                meta.fileFormat()
-                        .map(t -> t.toString().toLowerCase())
-                        .orElse(
-                                new CoreOptions(tableSchema.options())
-                                        .formatType()
-                                        .toString()
-                                        .toLowerCase()),
-                meta.schemaId(),
-                meta.rowCount());
-    }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 6adc3aff0..e71f5d281 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -439,6 +439,8 @@ public class TestFileStore extends KeyValueFileStore {
                                                 
.withBucket(entryWithBucket.getKey())
                                                 
.withDataFiles(entryWithBucket.getValue())
                                                 .isStreaming(isStreaming)
+                                                .rawConvertible(false)
+                                                .withBucketPath("not used")
                                                 .build()));
                 while (iterator.hasNext()) {
                     kvs.add(iterator.next().copy(keySerializer, 
valueSerializer));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index 3eff9b7cd..b8950772a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -117,6 +117,7 @@ public class IndexBootstrapTest extends TableTestBase {
                 .withPartition(EMPTY_ROW)
                 .withBucket(0)
                 .withDataFiles(Arrays.asList(files))
+                .withBucketPath("") // not used
                 .build();
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
index 806c869f9..5652fcd43 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
@@ -246,6 +246,7 @@ public class MergeFileSplitReadTest {
                                             entry.getValue().stream()
                                                     .map(ManifestEntry::file)
                                                     
.collect(Collectors.toList()))
+                                    .withBucketPath("not used")
                                     .build());
             RecordReaderIterator<KeyValue> actualIterator = new 
RecordReaderIterator<>(reader);
             while (actualIterator.hasNext()) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index 7fbd8f96d..b5dda8df7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -50,6 +50,7 @@ public class SplitTest {
                         .withPartition(data.partition)
                         .withBucket(data.bucket)
                         .withDataFiles(files)
+                        .withBucketPath("my path")
                         .build();
 
         ByteArrayOutputStream out = new ByteArrayOutputStream();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
index a34f16a03..5788e9722 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
@@ -38,6 +39,7 @@ import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IndexFile;
 import org.apache.paimon.table.source.RawFile;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -55,6 +57,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link SnapshotReader}. */
@@ -255,6 +258,82 @@ public class SnapshotReaderTest {
         commit.close();
     }
 
+    @Test
+    public void testGetAppendOnlyIndexFiles() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+                        new String[] {"k", "v"});
+        FileStoreTable table =
+                createFileStoreTable(rowType, Collections.emptyList(), 
Collections.emptyList());
+
+        String commitUser = UUID.randomUUID().toString();
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        SnapshotReader reader = table.newSnapshotReader();
+
+        // write one file
+
+        write.write(GenericRow.of(11, 1101L));
+        write.write(GenericRow.of(12, 1201L));
+        write.write(GenericRow.of(21, 2101L));
+        write.write(GenericRow.of(22, 2201L));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        List<DataSplit> dataSplits = reader.read().dataSplits();
+        assertThat(dataSplits).hasSize(1);
+        DataSplit dataSplit = dataSplits.get(0);
+        assertThat(dataSplit.dataFiles()).hasSize(1);
+        DataFileMeta meta = dataSplit.dataFiles().get(0);
+        assertThat(dataSplit.indexFiles())
+                .hasValue(
+                        Collections.singletonList(
+                                new IndexFile(
+                                        String.format(
+                                                "%s/bucket-0/%s" + 
INDEX_PATH_SUFFIX,
+                                                tablePath,
+                                                meta.fileName()))));
+
+        // change file schema
+
+        write.close();
+        SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+        schemaManager.commitChanges(SchemaChange.addColumn("v2", 
DataTypes.STRING()));
+        table = table.copyWithLatestSchema();
+        write = table.newWrite(commitUser);
+
+        // write another file
+
+        write.write(GenericRow.of(11, 1102L, 
BinaryString.fromString("eleven")));
+        write.write(GenericRow.of(12, 1202L, 
BinaryString.fromString("twelve")));
+        write.write(GenericRow.of(21, 2102L, 
BinaryString.fromString("twenty-one")));
+        write.write(GenericRow.of(22, 2202L, 
BinaryString.fromString("twenty-two")));
+        commit.commit(2, write.prepareCommit(false, 2));
+
+        dataSplits = reader.read().dataSplits();
+        assertThat(dataSplits).hasSize(1);
+        dataSplit = dataSplits.get(0);
+        assertThat(dataSplit.dataFiles()).hasSize(2);
+        DataFileMeta meta0 = dataSplit.dataFiles().get(0);
+        DataFileMeta meta1 = dataSplit.dataFiles().get(1);
+        assertThat(dataSplit.indexFiles())
+                .hasValue(
+                        Arrays.asList(
+                                new IndexFile(
+                                        String.format(
+                                                "%s/bucket-0/%s" + 
INDEX_PATH_SUFFIX,
+                                                tablePath,
+                                                meta0.fileName())),
+                                new IndexFile(
+                                        String.format(
+                                                "%s/bucket-0/%s" + 
INDEX_PATH_SUFFIX,
+                                                tablePath,
+                                                meta1.fileName()))));
+
+        write.close();
+        commit.close();
+    }
+
     private FileStoreTable createFileStoreTable(
             RowType rowType, List<String> partitionKeys, List<String> 
primaryKeys)
             throws Exception {
@@ -265,6 +344,14 @@ public class SnapshotReaderTest {
         Map<String, String> formatPerLevel = new HashMap<>();
         formatPerLevel.put("5", "orc");
         options.set(CoreOptions.FILE_FORMAT_PER_LEVEL, formatPerLevel);
+        // test read with extra files
+        options.set(
+                CoreOptions.FILE_INDEX
+                        + "."
+                        + BloomFilterFileIndex.BLOOM_FILTER
+                        + "."
+                        + CoreOptions.COLUMNS,
+                rowType.getFieldNames().get(0));
 
         SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
         TableSchema tableSchema =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
index d6c172fed..070b6f6b8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
@@ -34,7 +34,9 @@ import java.util.Objects;
  * contain any {@link org.apache.paimon.table.source.Split}.
  */
 public class PlaceholderSplit extends DataSplit {
+
     private static final long serialVersionUID = 3L;
+    private static final String NO_USE_BUCKET_PATH = "/no-used";
 
     private final DataSplit dataSplit;
 
@@ -47,6 +49,8 @@ public class PlaceholderSplit extends DataSplit {
                         .withDataFiles(Collections.emptyList())
                         .withPartition(BinaryRow.EMPTY_ROW)
                         .isStreaming(true)
+                        .rawConvertible(false)
+                        .withBucketPath(NO_USE_BUCKET_PATH)
                         .build();
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
index b41da234e..a3b327027 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
@@ -103,6 +103,7 @@ public class SortCompactActionForUnawareBucketITCase 
extends ActionITCaseBase {
                         .withPartition(entry.partition())
                         .withBucket(entry.bucket())
                         .withDataFiles(Collections.singletonList(entry.file()))
+                        .withBucketPath("not used")
                         .build();
 
         final AtomicInteger i = new AtomicInteger(Integer.MIN_VALUE);
@@ -128,6 +129,7 @@ public class SortCompactActionForUnawareBucketITCase 
extends ActionITCaseBase {
                         .withPartition(entry.partition())
                         .withBucket(entry.bucket())
                         .withDataFiles(Collections.singletonList(entry.file()))
+                        .withBucketPath("not used")
                         .build();
 
         i.set(Integer.MIN_VALUE);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 1a3f42424..77572bf01 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -837,6 +837,8 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                 .withBucket(bucket)
                 .withDataFiles(files)
                 .isStreaming(true)
+                .rawConvertible(false)
+                .withBucketPath("") // not used
                 .build();
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileSplitEnumeratorTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileSplitEnumeratorTestBase.java
index 419a8c478..446cbec0e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileSplitEnumeratorTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileSplitEnumeratorTestBase.java
@@ -61,6 +61,7 @@ public abstract class FileSplitEnumeratorTestBase {
                         .withBucket(bucket)
                         .withDataFiles(files)
                         .isStreaming(true)
+                        .withBucketPath("/temp/xxx") // not used
                         .build(),
                 0);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index ef364a1ed..e32ddbd7e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -122,6 +122,8 @@ public class FileStoreSourceSplitGeneratorTest {
                 .withBucket(bucket)
                 .isStreaming(false)
                 .withDataFiles(metas)
+                .rawConvertible(false)
+                .withBucketPath("/") // not used
                 .build();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
index 8d7d4c04e..e4f9473eb 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
@@ -126,6 +126,8 @@ public class FileStoreSourceSplitSerializerTest {
                         .withBucket(bucket)
                         .withDataFiles(files)
                         .isStreaming(isIncremental)
+                        .rawConvertible(false)
+                        .withBucketPath("/temp/" + bucket) // no used
                         .build();
         return new FileStoreSourceSplit(id, split, recordsToSkip);
     }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
index 7d152c444..755b2482f 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
@@ -76,6 +76,8 @@ public class PaimonInputSplitTest {
                                         .filter(d -> 
d.partition.equals(wantedPartition))
                                         .map(d -> d.meta)
                                         .collect(Collectors.toList()))
+                        .rawConvertible(false)
+                        .withBucketPath("not used")
                         .build();
         PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(), 
dataSplit, null);
 
diff --git a/paimon-spark/paimon-spark-common/pom.xml 
b/paimon-spark/paimon-spark-common/pom.xml
index 9cca628cb..e435174f8 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -135,10 +135,6 @@ under the License.
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
-                <exclusion>
-                    <groupId>org.apache.orc</groupId>
-                    <artifactId>orc-core</artifactId>
-                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index 01a2ca165..556bd82cd 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -64,25 +64,22 @@ trait ScanHelper {
     var currentSplit: Option[DataSplit] = None
     val currentDataFiles = new ArrayBuffer[DataFileMeta]
     val currentDeletionFiles = new ArrayBuffer[DeletionFile]
-    val currentRawFiles = new ArrayBuffer[RawFile]
     var currentSize = 0L
 
     def closeDataSplit(): Unit = {
       if (currentSplit.nonEmpty && currentDataFiles.nonEmpty) {
         val newSplit =
-          copyDataSplit(currentSplit.get, currentDataFiles, 
currentDeletionFiles, currentRawFiles)
+          copyDataSplit(currentSplit.get, currentDataFiles, 
currentDeletionFiles)
         newSplits += newSplit
       }
       currentDataFiles.clear()
       currentDeletionFiles.clear()
-      currentRawFiles.clear()
       currentSize = 0
     }
 
     splits.foreach {
       split =>
         currentSplit = Some(split)
-        val hasRawFiles = split.convertToRawFiles().isPresent
 
         split.dataFiles().asScala.zipWithIndex.foreach {
           case (file, idx) =>
@@ -94,9 +91,6 @@ trait ScanHelper {
             if (deletionVectors) {
               currentDeletionFiles += split.deletionFiles().get().get(idx)
             }
-            if (hasRawFiles) {
-              currentRawFiles += split.convertToRawFiles().get().get(idx)
-            }
         }
         closeDataSplit()
     }
@@ -115,15 +109,15 @@ trait ScanHelper {
   private def copyDataSplit(
       split: DataSplit,
       dataFiles: Seq[DataFileMeta],
-      deletionFiles: Seq[DeletionFile],
-      rawFiles: Seq[RawFile]): DataSplit = {
+      deletionFiles: Seq[DeletionFile]): DataSplit = {
     val builder = DataSplit
       .builder()
       .withSnapshot(split.snapshotId())
       .withPartition(split.partition())
       .withBucket(split.bucket())
       .withDataFiles(dataFiles.toList.asJava)
-      .rawFiles(rawFiles.toList.asJava)
+      .rawConvertible(split.rawConvertible())
+      .withBucketPath(split.getBucketPath)
     if (deletionVectors) {
       builder.withDataDeletionFiles(deletionFiles.toList.asJava)
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 95efb7a23..0ac0b14bb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -120,7 +120,10 @@ case class DeleteFromPaimonTableCommand(
     }
 
     // Step4: build a dataframe that contains the unchanged data, and write 
out them.
-    val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(touchedFiles)
+    val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(
+      touchedFiles,
+      rawConvertible = true,
+      table.store().pathFactory())
     val toRewriteScanRelation = Filter(
       Not(condition),
       Compatibility.createDataSourceV2ScanRelation(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
index cb3266157..8299f6b8c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
@@ -48,7 +48,10 @@ object SparkDataFileMeta {
     }
   }
 
-  def convertToDataSplits(sparkDataFiles: Array[SparkDataFileMeta]): 
Array[DataSplit] = {
+  def convertToDataSplits(
+      sparkDataFiles: Array[SparkDataFileMeta],
+      rawConvertible: Boolean,
+      pathFactory: FileStorePathFactory): Array[DataSplit] = {
     sparkDataFiles
       .groupBy(file => (file.partition, file.bucket))
       .map {
@@ -57,6 +60,8 @@ object SparkDataFileMeta {
             .withPartition(partition)
             .withBucket(bucket)
             .withDataFiles(files.map(_.dataFileMeta).toList.asJava)
+            .rawConvertible(rawConvertible)
+            .withBucketPath(pathFactory.bucketPath(partition, bucket).toString)
             .build()
       }
       .toArray
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 6c16ce5e8..ee41c5ebb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -102,7 +102,11 @@ case class UpdatePaimonTableCommand(
           }
           new Column(updated).as(origin.name, origin.metadata)
       }
-      val touchedDataSplits = 
SparkDataFileMeta.convertToDataSplits(touchedFiles)
+      // append only file always set rawConvertible true.
+      val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(
+        touchedFiles,
+        rawConvertible = true,
+        table.store().pathFactory())
       val toUpdateScanRelation = DataSourceV2ScanRelation(
         relation,
         PaimonSplitScan(table, touchedDataSplits),
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
index c6bda9e7c..7b150c1fc 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
@@ -38,13 +38,10 @@ class ScanHelperTest extends PaimonSparkTestBase {
       val fileNum = 100
 
       val files = scala.collection.mutable.ListBuffer.empty[DataFileMeta]
-      val rawFiles = scala.collection.mutable.ListBuffer.empty[RawFile]
       0.until(fileNum).foreach {
         i =>
           val path = s"f$i.parquet"
           files += DataFileMeta.forAppend(path, 750000, 30000, null, 0, 29999, 
1)
-
-          rawFiles += new RawFile(s"/a/b/$path", 0, 75000, "parquet", 0, 30000)
       }
 
       val dataSplits = mutable.ArrayBuffer.empty[Split]
@@ -56,7 +53,8 @@ class ScanHelperTest extends PaimonSparkTestBase {
             .withBucket(0)
             .withPartition(new BinaryRow(0))
             .withDataFiles(files.zipWithIndex.filter(_._2 % splitNum == 
i).map(_._1).toList.asJava)
-            .rawFiles(rawFiles.zipWithIndex.filter(_._2 % splitNum == 
i).map(_._1).toList.asJava)
+            .rawConvertible(true)
+            .withBucketPath("no use")
             .build()
       }
 


Reply via email to