This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 01f0764ae [core] Expose `IndexFile` in `Split`. (#3226)
01f0764ae is described below
commit 01f0764aef662a7086e4476b446acf8fa965b96e
Author: YeJunHao <[email protected]>
AuthorDate: Fri Apr 26 09:44:28 2024 +0800
[core] Expose `IndexFile` in `Split`. (#3226)
---
.../paimon/operation/AppendOnlyFileStoreWrite.java | 5 +
.../org/apache/paimon/table/source/DataSplit.java | 105 ++++++++++++++++-----
.../org/apache/paimon/table/source/IndexFile.java | 54 +++++++++++
.../java/org/apache/paimon/table/source/Split.java | 10 ++
.../snapshot/IncrementalStartingScanner.java | 87 ++++++++++++++---
.../table/source/snapshot/SnapshotReaderImpl.java | 37 ++------
.../test/java/org/apache/paimon/TestFileStore.java | 2 +
.../paimon/crosspartition/IndexBootstrapTest.java | 1 +
.../paimon/operation/MergeFileSplitReadTest.java | 1 +
.../org/apache/paimon/table/source/SplitTest.java | 1 +
.../table/source/snapshot/SnapshotReaderTest.java | 87 +++++++++++++++++
.../flink/source/align/PlaceholderSplit.java | 4 +
.../SortCompactActionForUnawareBucketITCase.java | 2 +
.../source/ContinuousFileSplitEnumeratorTest.java | 2 +
.../flink/source/FileSplitEnumeratorTestBase.java | 1 +
.../source/FileStoreSourceSplitGeneratorTest.java | 2 +
.../source/FileStoreSourceSplitSerializerTest.java | 2 +
.../paimon/hive/mapred/PaimonInputSplitTest.java | 2 +
paimon-spark/paimon-spark-common/pom.xml | 4 -
.../scala/org/apache/paimon/spark/ScanHelper.scala | 14 +--
.../commands/DeleteFromPaimonTableCommand.scala | 5 +-
.../paimon/spark/commands/SparkDataFileMeta.scala | 7 +-
.../spark/commands/UpdatePaimonTableCommand.scala | 6 +-
.../org/apache/paimon/spark/ScanHelperTest.scala | 6 +-
24 files changed, 363 insertions(+), 84 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index fc3f2a3d6..bb3074ac7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -197,6 +197,11 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(files)
+ .rawConvertible(true)
+ .withBucketPath(
+ pathFactory
+ .bucketPath(partition,
bucket)
+ .toString())
.build()));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index fbd5f723e..027aa0662 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -33,12 +33,13 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Input splits. Needed by most batch computation engines. */
@@ -56,7 +57,8 @@ public class DataSplit implements Split {
private List<DataFileMeta> dataFiles;
@Nullable private List<DeletionFile> dataDeletionFiles;
- private List<RawFile> rawFiles = Collections.emptyList();
+ private boolean rawConvertible;
+ private String bucketPath;
public DataSplit() {}
@@ -93,6 +95,14 @@ public class DataSplit implements Split {
return isStreaming;
}
+ public boolean rawConvertible() {
+ return rawConvertible;
+ }
+
+ public String getBucketPath() {
+ return bucketPath;
+ }
+
public OptionalLong getLatestFileCreationEpochMillis() {
return
this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max();
}
@@ -108,13 +118,61 @@ public class DataSplit implements Split {
@Override
public Optional<List<RawFile>> convertToRawFiles() {
- if (rawFiles.isEmpty()) {
- return Optional.empty();
+ if (rawConvertible) {
+ return Optional.of(
+ dataFiles.stream()
+ .map(f -> makeRawTableFile(bucketPath, f))
+ .collect(Collectors.toList()));
} else {
- return Optional.of(rawFiles);
+ return Optional.empty();
}
}
+ private RawFile makeRawTableFile(String bucketPath, DataFileMeta meta) {
+ return new RawFile(
+ bucketPath + "/" + meta.fileName(),
+ 0,
+ meta.fileSize(),
+ meta.fileFormat()
+ .map(t -> t.toString().toLowerCase())
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Can't find format from file: "
+ + bucketPath
+ + "/"
+ + meta.fileName())),
+ meta.schemaId(),
+ meta.rowCount());
+ }
+
+ @Override
+ @Nullable
+ public Optional<List<IndexFile>> indexFiles() {
+ List<IndexFile> indexFiles = new ArrayList<>();
+ boolean hasIndexFile = false;
+ for (DataFileMeta file : dataFiles) {
+ List<String> exFiles =
+ file.extraFiles().stream()
+ .filter(s -> s.endsWith(INDEX_PATH_SUFFIX))
+ .collect(Collectors.toList());
+ if (exFiles.isEmpty()) {
+ indexFiles.add(null);
+ } else if (exFiles.size() == 1) {
+ hasIndexFile = true;
+ indexFiles.add(new IndexFile(bucketPath + "/" +
exFiles.get(0)));
+ } else {
+ throw new RuntimeException(
+ "Wrong number of file index for file "
+ + file.fileName()
+ + " index files: "
+ + String.join(",", exFiles));
+ }
+ }
+
+ return hasIndexFile ? Optional.of(indexFiles) : Optional.empty();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -131,7 +189,8 @@ public class DataSplit implements Split {
&& Objects.equals(dataFiles, split.dataFiles)
&& Objects.equals(dataDeletionFiles, split.dataDeletionFiles)
&& isStreaming == split.isStreaming
- && Objects.equals(rawFiles, split.rawFiles);
+ && rawConvertible == split.rawConvertible
+ && Objects.equals(bucketPath, split.bucketPath);
}
@Override
@@ -144,7 +203,8 @@ public class DataSplit implements Split {
dataFiles,
dataDeletionFiles,
isStreaming,
- rawFiles);
+ rawConvertible,
+ bucketPath);
}
private void writeObject(ObjectOutputStream out) throws IOException {
@@ -164,7 +224,8 @@ public class DataSplit implements Split {
this.dataFiles = other.dataFiles;
this.dataDeletionFiles = other.dataDeletionFiles;
this.isStreaming = other.isStreaming;
- this.rawFiles = other.rawFiles;
+ this.rawConvertible = other.rawConvertible;
+ this.bucketPath = other.bucketPath;
}
public void serialize(DataOutputView out) throws IOException {
@@ -189,10 +250,8 @@ public class DataSplit implements Split {
out.writeBoolean(isStreaming);
- out.writeInt(rawFiles.size());
- for (RawFile rawFile : rawFiles) {
- rawFile.serialize(out);
- }
+ out.writeBoolean(rawConvertible);
+ out.writeUTF(bucketPath);
}
public static DataSplit deserialize(DataInputView in) throws IOException {
@@ -218,12 +277,8 @@ public class DataSplit implements Split {
List<DeletionFile> dataDeletionFiles =
DeletionFile.deserializeList(in);
boolean isStreaming = in.readBoolean();
-
- int rawFileNum = in.readInt();
- List<RawFile> rawFiles = new ArrayList<>();
- for (int i = 0; i < rawFileNum; i++) {
- rawFiles.add(RawFile.deserialize(in));
- }
+ boolean rawConvertible = in.readBoolean();
+ String bucketPath = in.readUTF();
DataSplit.Builder builder =
builder()
@@ -233,7 +288,9 @@ public class DataSplit implements Split {
.withBeforeFiles(beforeFiles)
.withDataFiles(dataFiles)
.isStreaming(isStreaming)
- .rawFiles(rawFiles);
+ .rawConvertible(rawConvertible)
+ .withBucketPath(bucketPath);
+
if (beforeDeletionFiles != null) {
builder.withBeforeDeletionFiles(beforeDeletionFiles);
}
@@ -292,8 +349,13 @@ public class DataSplit implements Split {
return this;
}
- public Builder rawFiles(List<RawFile> rawFiles) {
- this.split.rawFiles = rawFiles;
+ public Builder rawConvertible(boolean rawConvertible) {
+ this.split.rawConvertible = rawConvertible;
+ return this;
+ }
+
+ public Builder withBucketPath(String bucketPath) {
+ this.split.bucketPath = bucketPath;
return this;
}
@@ -301,6 +363,7 @@ public class DataSplit implements Split {
checkArgument(split.partition != null);
checkArgument(split.bucket != -1);
checkArgument(split.dataFiles != null);
+ checkArgument(split.bucketPath != null);
DataSplit split = new DataSplit();
split.assign(this.split);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/IndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/IndexFile.java
new file mode 100644
index 000000000..1c7814d4e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/IndexFile.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/** Index file for data file. */
+public class IndexFile {
+
+ private final String path;
+
+ public IndexFile(String path) {
+ this.path = path;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof IndexFile)) {
+ return false;
+ }
+
+ IndexFile other = (IndexFile) o;
+ return Objects.equals(path, other.path);
+ }
+
+ public void serialize(DataOutputView out) throws IOException {
+ out.writeUTF(path);
+ }
+
+ public static IndexFile deserialize(DataInputView in) throws IOException {
+ String path = in.readUTF();
+ return new IndexFile(path);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
index adefb868f..d6cb381e2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/Split.java
@@ -51,4 +51,14 @@ public interface Split extends Serializable {
default Optional<List<DeletionFile>> deletionFiles() {
return Optional.empty();
}
+
+ /**
+ * * Return the index file of the data file, for example, bloom-filter
index. All the type of
+ * indexes and columns will be stored in one single index file.
+ *
+ * <p>If there is no corresponding index file, the element will be null.
+ */
+ default Optional<List<IndexFile>> indexFiles() {
+ return Optional.empty();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 602a6370a..86705ce44 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -23,17 +23,22 @@ import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
/** {@link StartingScanner} for incremental changes by snapshot. */
public class IncrementalStartingScanner extends AbstractStartingScanner {
@@ -52,34 +57,49 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
@Override
public Result scan(SnapshotReader reader) {
- Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new
HashMap<>();
+ Map<SplitInfo, List<DataFileMeta>> grouped = new HashMap<>();
for (long i = startingSnapshotId + 1; i < endingSnapshotId + 1; i++) {
List<DataSplit> splits = readSplits(reader,
snapshotManager.snapshot(i));
for (DataSplit split : splits) {
grouped.computeIfAbsent(
- Pair.of(split.partition(), split.bucket()), k
-> new ArrayList<>())
+ new SplitInfo(
+ split.partition(),
+ split.bucket(),
+ // take it for false, because multiple
snapshot read may
+ // need merge for primary key table
+ false,
+ split.getBucketPath(),
+ split.deletionFiles().orElse(null)),
+ k -> new ArrayList<>())
.addAll(split.dataFiles());
}
}
- List<DataSplit> result = new ArrayList<>();
- for (Map.Entry<Pair<BinaryRow, Integer>, List<DataFileMeta>> entry :
grouped.entrySet()) {
- BinaryRow partition = entry.getKey().getLeft();
- int bucket = entry.getKey().getRight();
+ List<Split> result = new ArrayList<>();
+ for (Map.Entry<SplitInfo, List<DataFileMeta>> entry :
grouped.entrySet()) {
+ BinaryRow partition = entry.getKey().partition;
+ int bucket = entry.getKey().bucket;
+ boolean rawConvertible = entry.getKey().rawConvertible;
+ String bucketPath = entry.getKey().bucketPath;
+ List<DeletionFile> deletionFiles = entry.getKey().deletionFiles;
for (SplitGenerator.SplitGroup splitGroup :
reader.splitGenerator().splitForBatch(entry.getValue())) {
- // TODO pass deletion files
- result.add(
+ DataSplit.Builder dataSplitBuilder =
DataSplit.builder()
.withSnapshot(endingSnapshotId)
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(splitGroup.files)
- .build());
+ .rawConvertible(rawConvertible)
+ .withBucketPath(bucketPath);
+ if (deletionFiles != null) {
+ dataSplitBuilder.withDataDeletionFiles(deletionFiles);
+ }
+ result.add(dataSplitBuilder.build());
}
}
- return StartingScanner.fromPlan(new PlanImpl(null, endingSnapshotId,
(List) result));
+ return StartingScanner.fromPlan(new PlanImpl(null, endingSnapshotId,
result));
}
private List<DataSplit> readSplits(SnapshotReader reader, Snapshot s) {
@@ -110,4 +130,49 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
}
return (List)
reader.withSnapshot(s).withMode(ScanMode.CHANGELOG).read().splits();
}
+
+ /** Split information to pass. */
+ private static class SplitInfo {
+
+ private final BinaryRow partition;
+ private final int bucket;
+ private final boolean rawConvertible;
+ private final String bucketPath;
+ @Nullable private final List<DeletionFile> deletionFiles;
+
+ private SplitInfo(
+ BinaryRow partition,
+ int bucket,
+ boolean rawConvertible,
+ String bucketPath,
+ @Nullable List<DeletionFile> deletionFiles) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.rawConvertible = rawConvertible;
+ this.bucketPath = bucketPath;
+ this.deletionFiles = deletionFiles;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(
+ new Object[] {partition, bucket, rawConvertible,
bucketPath, deletionFiles});
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+
+ if (!(obj instanceof SplitInfo)) {
+ return false;
+ }
+
+ SplitInfo that = (SplitInfo) obj;
+
+ return Objects.equals(partition, that.partition)
+ && bucket == that.bucket
+ && rawConvertible == that.rawConvertible
+ && Objects.equals(bucketPath, that.bucketPath)
+ && Objects.equals(deletionFiles, that.deletionFiles);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 205dfb3a1..c36427298 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -40,7 +40,6 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.PlanImpl;
-import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.types.RowType;
@@ -290,11 +289,10 @@ public class SnapshotReaderImpl implements SnapshotReader
{
: null;
for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
List<DataFileMeta> dataFiles = splitGroup.files;
- builder.withDataFiles(dataFiles);
- builder.rawFiles(
- splitGroup.rawConvertible
- ? convertToRawFiles(partition, bucket,
dataFiles)
- : Collections.emptyList());
+ String bucketPath = pathFactory.bucketPath(partition,
bucket).toString();
+ builder.withDataFiles(dataFiles)
+ .rawConvertible(splitGroup.rawConvertible)
+ .withBucketPath(bucketPath);
if (deletionVectors) {
builder.withDataDeletionFiles(
getDeletionFiles(dataFiles,
deletionIndexFile));
@@ -371,7 +369,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
.withBucket(bucket)
.withBeforeFiles(before)
.withDataFiles(data)
- .isStreaming(isStreaming);
+ .isStreaming(isStreaming)
+ .withBucketPath(pathFactory.bucketPath(part,
bucket).toString());
if (deletionVectors) {
IndexFileMeta beforeDeletionIndex =
indexFileHandler
@@ -433,28 +432,4 @@ public class SnapshotReaderImpl implements SnapshotReader {
return deletionFiles;
}
-
- private List<RawFile> convertToRawFiles(
- BinaryRow partition, int bucket, List<DataFileMeta> dataFiles) {
- String bucketPath = pathFactory.bucketPath(partition,
bucket).toString();
- return dataFiles.stream()
- .map(f -> makeRawTableFile(bucketPath, f))
- .collect(Collectors.toList());
- }
-
- private RawFile makeRawTableFile(String bucketPath, DataFileMeta meta) {
- return new RawFile(
- bucketPath + "/" + meta.fileName(),
- 0,
- meta.fileSize(),
- meta.fileFormat()
- .map(t -> t.toString().toLowerCase())
- .orElse(
- new CoreOptions(tableSchema.options())
- .formatType()
- .toString()
- .toLowerCase()),
- meta.schemaId(),
- meta.rowCount());
- }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 6adc3aff0..e71f5d281 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -439,6 +439,8 @@ public class TestFileStore extends KeyValueFileStore {
.withBucket(entryWithBucket.getKey())
.withDataFiles(entryWithBucket.getValue())
.isStreaming(isStreaming)
+ .rawConvertible(false)
+ .withBucketPath("not used")
.build()));
while (iterator.hasNext()) {
kvs.add(iterator.next().copy(keySerializer,
valueSerializer));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index 3eff9b7cd..b8950772a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -117,6 +117,7 @@ public class IndexBootstrapTest extends TableTestBase {
.withPartition(EMPTY_ROW)
.withBucket(0)
.withDataFiles(Arrays.asList(files))
+ .withBucketPath("") // not used
.build();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
index 806c869f9..5652fcd43 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
@@ -246,6 +246,7 @@ public class MergeFileSplitReadTest {
entry.getValue().stream()
.map(ManifestEntry::file)
.collect(Collectors.toList()))
+ .withBucketPath("not used")
.build());
RecordReaderIterator<KeyValue> actualIterator = new
RecordReaderIterator<>(reader);
while (actualIterator.hasNext()) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index 7fbd8f96d..b5dda8df7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -50,6 +50,7 @@ public class SplitTest {
.withPartition(data.partition)
.withBucket(data.bucket)
.withDataFiles(files)
+ .withBucketPath("my path")
.build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
index a34f16a03..5788e9722 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndex;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
@@ -38,6 +39,7 @@ import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.IndexFile;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -55,6 +57,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link SnapshotReader}. */
@@ -255,6 +258,82 @@ public class SnapshotReaderTest {
commit.close();
}
+ @Test
+ public void testGetAppendOnlyIndexFiles() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+ new String[] {"k", "v"});
+ FileStoreTable table =
+ createFileStoreTable(rowType, Collections.emptyList(),
Collections.emptyList());
+
+ String commitUser = UUID.randomUUID().toString();
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ SnapshotReader reader = table.newSnapshotReader();
+
+ // write one file
+
+ write.write(GenericRow.of(11, 1101L));
+ write.write(GenericRow.of(12, 1201L));
+ write.write(GenericRow.of(21, 2101L));
+ write.write(GenericRow.of(22, 2201L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ List<DataSplit> dataSplits = reader.read().dataSplits();
+ assertThat(dataSplits).hasSize(1);
+ DataSplit dataSplit = dataSplits.get(0);
+ assertThat(dataSplit.dataFiles()).hasSize(1);
+ DataFileMeta meta = dataSplit.dataFiles().get(0);
+ assertThat(dataSplit.indexFiles())
+ .hasValue(
+ Collections.singletonList(
+ new IndexFile(
+ String.format(
+ "%s/bucket-0/%s" +
INDEX_PATH_SUFFIX,
+ tablePath,
+ meta.fileName()))));
+
+ // change file schema
+
+ write.close();
+ SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+ schemaManager.commitChanges(SchemaChange.addColumn("v2",
DataTypes.STRING()));
+ table = table.copyWithLatestSchema();
+ write = table.newWrite(commitUser);
+
+ // write another file
+
+ write.write(GenericRow.of(11, 1102L,
BinaryString.fromString("eleven")));
+ write.write(GenericRow.of(12, 1202L,
BinaryString.fromString("twelve")));
+ write.write(GenericRow.of(21, 2102L,
BinaryString.fromString("twenty-one")));
+ write.write(GenericRow.of(22, 2202L,
BinaryString.fromString("twenty-two")));
+ commit.commit(2, write.prepareCommit(false, 2));
+
+ dataSplits = reader.read().dataSplits();
+ assertThat(dataSplits).hasSize(1);
+ dataSplit = dataSplits.get(0);
+ assertThat(dataSplit.dataFiles()).hasSize(2);
+ DataFileMeta meta0 = dataSplit.dataFiles().get(0);
+ DataFileMeta meta1 = dataSplit.dataFiles().get(1);
+ assertThat(dataSplit.indexFiles())
+ .hasValue(
+ Arrays.asList(
+ new IndexFile(
+ String.format(
+ "%s/bucket-0/%s" +
INDEX_PATH_SUFFIX,
+ tablePath,
+ meta0.fileName())),
+ new IndexFile(
+ String.format(
+ "%s/bucket-0/%s" +
INDEX_PATH_SUFFIX,
+ tablePath,
+ meta1.fileName()))));
+
+ write.close();
+ commit.close();
+ }
+
private FileStoreTable createFileStoreTable(
RowType rowType, List<String> partitionKeys, List<String>
primaryKeys)
throws Exception {
@@ -265,6 +344,14 @@ public class SnapshotReaderTest {
Map<String, String> formatPerLevel = new HashMap<>();
formatPerLevel.put("5", "orc");
options.set(CoreOptions.FILE_FORMAT_PER_LEVEL, formatPerLevel);
+ // test read with extra files
+ options.set(
+ CoreOptions.FILE_INDEX
+ + "."
+ + BloomFilterFileIndex.BLOOM_FILTER
+ + "."
+ + CoreOptions.COLUMNS,
+ rowType.getFieldNames().get(0));
SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
TableSchema tableSchema =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
index d6c172fed..070b6f6b8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/PlaceholderSplit.java
@@ -34,7 +34,9 @@ import java.util.Objects;
* contain any {@link org.apache.paimon.table.source.Split}.
*/
public class PlaceholderSplit extends DataSplit {
+
private static final long serialVersionUID = 3L;
+ private static final String NO_USE_BUCKET_PATH = "/no-used";
private final DataSplit dataSplit;
@@ -47,6 +49,8 @@ public class PlaceholderSplit extends DataSplit {
.withDataFiles(Collections.emptyList())
.withPartition(BinaryRow.EMPTY_ROW)
.isStreaming(true)
+ .rawConvertible(false)
+ .withBucketPath(NO_USE_BUCKET_PATH)
.build();
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
index b41da234e..a3b327027 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
@@ -103,6 +103,7 @@ public class SortCompactActionForUnawareBucketITCase
extends ActionITCaseBase {
.withPartition(entry.partition())
.withBucket(entry.bucket())
.withDataFiles(Collections.singletonList(entry.file()))
+ .withBucketPath("not used")
.build();
final AtomicInteger i = new AtomicInteger(Integer.MIN_VALUE);
@@ -128,6 +129,7 @@ public class SortCompactActionForUnawareBucketITCase
extends ActionITCaseBase {
.withPartition(entry.partition())
.withBucket(entry.bucket())
.withDataFiles(Collections.singletonList(entry.file()))
+ .withBucketPath("not used")
.build();
i.set(Integer.MIN_VALUE);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 1a3f42424..77572bf01 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -837,6 +837,8 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.withBucket(bucket)
.withDataFiles(files)
.isStreaming(true)
+ .rawConvertible(false)
+ .withBucketPath("") // not used
.build();
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileSplitEnumeratorTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileSplitEnumeratorTestBase.java
index 419a8c478..446cbec0e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileSplitEnumeratorTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileSplitEnumeratorTestBase.java
@@ -61,6 +61,7 @@ public abstract class FileSplitEnumeratorTestBase {
.withBucket(bucket)
.withDataFiles(files)
.isStreaming(true)
+ .withBucketPath("/temp/xxx") // not used
.build(),
0);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index ef364a1ed..e32ddbd7e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -122,6 +122,8 @@ public class FileStoreSourceSplitGeneratorTest {
.withBucket(bucket)
.isStreaming(false)
.withDataFiles(metas)
+ .rawConvertible(false)
+ .withBucketPath("/") // not used
.build();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
index 8d7d4c04e..e4f9473eb 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java
@@ -126,6 +126,8 @@ public class FileStoreSourceSplitSerializerTest {
.withBucket(bucket)
.withDataFiles(files)
.isStreaming(isIncremental)
+ .rawConvertible(false)
+ .withBucketPath("/temp/" + bucket) // no used
.build();
return new FileStoreSourceSplit(id, split, recordsToSkip);
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
index 7d152c444..755b2482f 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java
@@ -76,6 +76,8 @@ public class PaimonInputSplitTest {
.filter(d ->
d.partition.equals(wantedPartition))
.map(d -> d.meta)
.collect(Collectors.toList()))
+ .rawConvertible(false)
+ .withBucketPath("not used")
.build();
PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(),
dataSplit, null);
diff --git a/paimon-spark/paimon-spark-common/pom.xml
b/paimon-spark/paimon-spark-common/pom.xml
index 9cca628cb..e435174f8 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -135,10 +135,6 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
- <exclusion>
- <groupId>org.apache.orc</groupId>
- <artifactId>orc-core</artifactId>
- </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index 01a2ca165..556bd82cd 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -64,25 +64,22 @@ trait ScanHelper {
var currentSplit: Option[DataSplit] = None
val currentDataFiles = new ArrayBuffer[DataFileMeta]
val currentDeletionFiles = new ArrayBuffer[DeletionFile]
- val currentRawFiles = new ArrayBuffer[RawFile]
var currentSize = 0L
def closeDataSplit(): Unit = {
if (currentSplit.nonEmpty && currentDataFiles.nonEmpty) {
val newSplit =
- copyDataSplit(currentSplit.get, currentDataFiles,
currentDeletionFiles, currentRawFiles)
+ copyDataSplit(currentSplit.get, currentDataFiles,
currentDeletionFiles)
newSplits += newSplit
}
currentDataFiles.clear()
currentDeletionFiles.clear()
- currentRawFiles.clear()
currentSize = 0
}
splits.foreach {
split =>
currentSplit = Some(split)
- val hasRawFiles = split.convertToRawFiles().isPresent
split.dataFiles().asScala.zipWithIndex.foreach {
case (file, idx) =>
@@ -94,9 +91,6 @@ trait ScanHelper {
if (deletionVectors) {
currentDeletionFiles += split.deletionFiles().get().get(idx)
}
- if (hasRawFiles) {
- currentRawFiles += split.convertToRawFiles().get().get(idx)
- }
}
closeDataSplit()
}
@@ -115,15 +109,15 @@ trait ScanHelper {
private def copyDataSplit(
split: DataSplit,
dataFiles: Seq[DataFileMeta],
- deletionFiles: Seq[DeletionFile],
- rawFiles: Seq[RawFile]): DataSplit = {
+ deletionFiles: Seq[DeletionFile]): DataSplit = {
val builder = DataSplit
.builder()
.withSnapshot(split.snapshotId())
.withPartition(split.partition())
.withBucket(split.bucket())
.withDataFiles(dataFiles.toList.asJava)
- .rawFiles(rawFiles.toList.asJava)
+ .rawConvertible(split.rawConvertible())
+ .withBucketPath(split.getBucketPath)
if (deletionVectors) {
builder.withDataDeletionFiles(deletionFiles.toList.asJava)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 95efb7a23..0ac0b14bb 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -120,7 +120,10 @@ case class DeleteFromPaimonTableCommand(
}
// Step4: build a dataframe that contains the unchanged data, and write
out them.
- val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(touchedFiles)
+ val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(
+ touchedFiles,
+ rawConvertible = true,
+ table.store().pathFactory())
val toRewriteScanRelation = Filter(
Not(condition),
Compatibility.createDataSourceV2ScanRelation(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
index cb3266157..8299f6b8c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
@@ -48,7 +48,10 @@ object SparkDataFileMeta {
}
}
- def convertToDataSplits(sparkDataFiles: Array[SparkDataFileMeta]):
Array[DataSplit] = {
+ def convertToDataSplits(
+ sparkDataFiles: Array[SparkDataFileMeta],
+ rawConvertible: Boolean,
+ pathFactory: FileStorePathFactory): Array[DataSplit] = {
sparkDataFiles
.groupBy(file => (file.partition, file.bucket))
.map {
@@ -57,6 +60,8 @@ object SparkDataFileMeta {
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(files.map(_.dataFileMeta).toList.asJava)
+ .rawConvertible(rawConvertible)
+ .withBucketPath(pathFactory.bucketPath(partition, bucket).toString)
.build()
}
.toArray
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 6c16ce5e8..ee41c5ebb 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -102,7 +102,11 @@ case class UpdatePaimonTableCommand(
}
new Column(updated).as(origin.name, origin.metadata)
}
- val touchedDataSplits =
SparkDataFileMeta.convertToDataSplits(touchedFiles)
+ // append only file always set rawConvertible true.
+ val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(
+ touchedFiles,
+ rawConvertible = true,
+ table.store().pathFactory())
val toUpdateScanRelation = DataSourceV2ScanRelation(
relation,
PaimonSplitScan(table, touchedDataSplits),
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
index c6bda9e7c..7b150c1fc 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
@@ -38,13 +38,10 @@ class ScanHelperTest extends PaimonSparkTestBase {
val fileNum = 100
val files = scala.collection.mutable.ListBuffer.empty[DataFileMeta]
- val rawFiles = scala.collection.mutable.ListBuffer.empty[RawFile]
0.until(fileNum).foreach {
i =>
val path = s"f$i.parquet"
files += DataFileMeta.forAppend(path, 750000, 30000, null, 0, 29999,
1)
-
- rawFiles += new RawFile(s"/a/b/$path", 0, 75000, "parquet", 0, 30000)
}
val dataSplits = mutable.ArrayBuffer.empty[Split]
@@ -56,7 +53,8 @@ class ScanHelperTest extends PaimonSparkTestBase {
.withBucket(0)
.withPartition(new BinaryRow(0))
.withDataFiles(files.zipWithIndex.filter(_._2 % splitNum ==
i).map(_._1).toList.asJava)
- .rawFiles(rawFiles.zipWithIndex.filter(_._2 % splitNum ==
i).map(_._1).toList.asJava)
+ .rawConvertible(true)
+ .withBucketPath("no use")
.build()
}