This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8ff2537b2 [core] Introduce RawFileSplitRead to accelerate batch read
for primary key table (#3209)
8ff2537b2 is described below
commit 8ff2537b2a0ac43365746193c59bdd6977e9394d
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 15 16:48:16 2024 +0800
[core] Introduce RawFileSplitRead to accelerate batch read for primary key
table (#3209)
---
.../java/org/apache/paimon/utils/LazyField.java | 38 ++--
.../org/apache/paimon/AppendOnlyFileStore.java | 6 +-
.../src/main/java/org/apache/paimon/FileStore.java | 4 +-
.../java/org/apache/paimon/KeyValueFileStore.java | 17 +-
.../deletionvectors/ApplyDeletionVectorReader.java | 13 --
.../paimon/operation/AppendOnlyFileStoreRead.java | 192 ------------------
.../paimon/operation/AppendOnlyFileStoreWrite.java | 4 +-
...eFileStoreRead.java => MergeFileSplitRead.java} | 25 ++-
.../apache/paimon/operation/RawFileSplitRead.java | 216 +++++++++++++++++++++
.../{FileStoreRead.java => SplitRead.java} | 4 +-
.../paimon/table/AppendOnlyFileStoreTable.java | 12 +-
.../paimon/table/PrimaryKeyFileStoreTable.java | 23 +--
.../paimon/table/source/AbstractDataTableRead.java | 10 +-
.../paimon/table/source/KeyValueTableRead.java | 123 +++++++++---
.../org/apache/paimon/table/source/TableRead.java | 4 +-
.../test/java/org/apache/paimon/TestFileStore.java | 4 +-
...reReadTest.java => MergeFileSplitReadTest.java} | 6 +-
.../paimon/flink/action/MergeIntoActionITCase.java | 6 +-
.../flink/source/TestChangelogDataReadWrite.java | 46 ++---
.../paimon/flink/util/ReadWriteTableTestUtil.java | 22 ++-
20 files changed, 431 insertions(+), 344 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java
b/paimon-common/src/main/java/org/apache/paimon/utils/LazyField.java
similarity index 58%
copy from
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java
copy to paimon-common/src/main/java/org/apache/paimon/utils/LazyField.java
index 2d3e121b1..2bb701362 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/LazyField.java
@@ -16,23 +16,33 @@
* limitations under the License.
*/
-package org.apache.paimon.operation;
+package org.apache.paimon.utils;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.source.DataSplit;
+import java.util.function.Supplier;
-import java.io.IOException;
+/** A class to lazy initialized field. */
+public class LazyField<T> {
-/**
- * Read operation which provides {@link RecordReader} creation.
- *
- * @param <T> type of record to read.
- */
-public interface FileStoreRead<T> {
+ private final Supplier<T> supplier;
+
+ private boolean initialized;
+ private T value;
+
+ public LazyField(Supplier<T> supplier) {
+ this.supplier = supplier;
+ }
- FileStoreRead<T> withFilter(Predicate predicate);
+ public T get() {
+ if (!initialized) {
+ T t = supplier.get();
+ value = t;
+ initialized = true;
+ return t;
+ }
+ return value;
+ }
- /** Create a {@link RecordReader} from split. */
- RecordReader<T> createReader(DataSplit split) throws IOException;
+ public boolean initialized() {
+ return initialized;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 0d546e215..186fd4799 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -22,9 +22,9 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestCacheFilter;
-import org.apache.paimon.operation.AppendOnlyFileStoreRead;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
+import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.operation.ScanBucketFilter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
@@ -79,8 +79,8 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
}
@Override
- public AppendOnlyFileStoreRead newRead() {
- return new AppendOnlyFileStoreRead(
+ public RawFileSplitRead newRead() {
+ return new RawFileSplitRead(
fileIO,
schemaManager,
schema,
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index cd38d2061..6731121c5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -23,11 +23,11 @@ import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.operation.FileStoreRead;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
+import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFileHandler;
@@ -73,7 +73,7 @@ public interface FileStore<T> extends Serializable {
StatsFileHandler newStatsFileHandler();
- FileStoreRead<T> newRead();
+ SplitRead<T> newRead();
FileStoreWrite<T> newWrite(String commitUser);
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 956b615d7..354cc6dda 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -28,9 +28,10 @@ import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
-import org.apache.paimon.operation.KeyValueFileStoreRead;
import org.apache.paimon.operation.KeyValueFileStoreScan;
import org.apache.paimon.operation.KeyValueFileStoreWrite;
+import org.apache.paimon.operation.MergeFileSplitRead;
+import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.operation.ScanBucketFilter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -119,8 +120,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
}
@Override
- public KeyValueFileStoreRead newRead() {
- return new KeyValueFileStoreRead(
+ public MergeFileSplitRead newRead() {
+ return new MergeFileSplitRead(
options,
schema,
keyType,
@@ -130,6 +131,16 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
newReaderFactoryBuilder());
}
+ public RawFileSplitRead newBatchRawFileRead() {
+ return new RawFileSplitRead(
+ fileIO,
+ schemaManager,
+ schema,
+ valueType,
+ FileFormatDiscover.of(options),
+ pathFactory());
+ }
+
public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
return KeyValueFileReaderFactory.builder(
fileIO,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
index dadde99ea..6cc8b396f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
@@ -24,7 +24,6 @@ import org.apache.paimon.reader.RecordReader;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Optional;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -40,18 +39,6 @@ public class ApplyDeletionVectorReader<T> implements
RecordReader<T> {
this.deletionVector = deletionVector;
}
- public static <T> RecordReader<T> create(RecordReader<T> reader,
Optional<DeletionVector> dv) {
- return create(reader, dv.orElse(null));
- }
-
- public static <T> RecordReader<T> create(RecordReader<T> reader, @Nullable
DeletionVector dv) {
- if (dv == null) {
- return reader;
- }
-
- return new ApplyDeletionVectorReader<>(reader, dv);
- }
-
@Nullable
@Override
public RecordIterator<T> readBatch() throws IOException {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
deleted file mode 100644
index 49eea905c..000000000
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation;
-
-import org.apache.paimon.AppendOnlyFileStore;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.FileFormatDiscover;
-import org.apache.paimon.format.FormatKey;
-import org.apache.paimon.format.FormatReaderContext;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.FileRecordReader;
-import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.partition.PartitionUtils;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.IndexCastMapping;
-import org.apache.paimon.schema.SchemaEvolutionUtil;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.BulkFormatMapping;
-import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Projection;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-
-/** {@link FileStoreRead} for {@link AppendOnlyFileStore}. */
-public class AppendOnlyFileStoreRead implements FileStoreRead<InternalRow> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(AppendOnlyFileStoreRead.class);
-
- private final FileIO fileIO;
- private final SchemaManager schemaManager;
- private final TableSchema schema;
- private final FileFormatDiscover formatDiscover;
- private final FileStorePathFactory pathFactory;
- private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
-
- private int[][] projection;
-
- @Nullable private List<Predicate> filters;
-
- public AppendOnlyFileStoreRead(
- FileIO fileIO,
- SchemaManager schemaManager,
- TableSchema schema,
- RowType rowType,
- FileFormatDiscover formatDiscover,
- FileStorePathFactory pathFactory) {
- this.fileIO = fileIO;
- this.schemaManager = schemaManager;
- this.schema = schema;
- this.formatDiscover = formatDiscover;
- this.pathFactory = pathFactory;
- this.bulkFormatMappings = new HashMap<>();
-
- this.projection = Projection.range(0,
rowType.getFieldCount()).toNestedIndexes();
- }
-
- public FileStoreRead<InternalRow> withProjection(int[][] projectedFields) {
- projection = projectedFields;
- return this;
- }
-
- @Override
- public FileStoreRead<InternalRow> withFilter(Predicate predicate) {
- this.filters = splitAnd(predicate);
- return this;
- }
-
- @Override
- public RecordReader<InternalRow> createReader(DataSplit split) throws
IOException {
- DataFilePathFactory dataFilePathFactory =
- pathFactory.createDataFilePathFactory(split.partition(),
split.bucket());
- List<ConcatRecordReader.ReaderSupplier<InternalRow>> suppliers = new
ArrayList<>();
- if (split.beforeFiles().size() > 0) {
- LOG.info("Ignore split before files: " + split.beforeFiles());
- }
- for (DataFileMeta file : split.dataFiles()) {
- String formatIdentifier =
DataFilePathFactory.formatIdentifier(file.fileName());
- BulkFormatMapping bulkFormatMapping =
- bulkFormatMappings.computeIfAbsent(
- new FormatKey(file.schemaId(), formatIdentifier),
- key -> {
- TableSchema tableSchema = schema;
- TableSchema dataSchema =
- key.schemaId == schema.id()
- ? schema
- :
schemaManager.schema(key.schemaId);
-
- // projection to data schema
- int[][] dataProjection =
-
SchemaEvolutionUtil.createDataProjection(
- tableSchema.fields(),
- dataSchema.fields(),
- projection);
-
- IndexCastMapping indexCastMapping =
-
SchemaEvolutionUtil.createIndexCastMapping(
-
Projection.of(projection).toTopLevelIndexes(),
- tableSchema.fields(),
-
Projection.of(dataProjection).toTopLevelIndexes(),
- dataSchema.fields());
-
- List<Predicate> dataFilters =
- this.schema.id() == key.schemaId
- ? filters
- :
SchemaEvolutionUtil.createDataFilters(
- tableSchema.fields(),
- dataSchema.fields(),
- filters);
-
- Pair<int[], RowType> partitionPair = null;
- if (!dataSchema.partitionKeys().isEmpty()) {
- Pair<int[], int[][]> partitionMapping =
-
PartitionUtils.constructPartitionMapping(
- dataSchema,
dataProjection);
- // if partition fields are not selected,
we just do nothing
- if (partitionMapping != null) {
- dataProjection =
partitionMapping.getRight();
- partitionPair =
- Pair.of(
-
partitionMapping.getLeft(),
-
dataSchema.projectedLogicalRowType(
-
dataSchema.partitionKeys()));
- }
- }
-
- RowType projectedRowType =
- Projection.of(dataProjection)
-
.project(dataSchema.logicalRowType());
-
- return new BulkFormatMapping(
- indexCastMapping.getIndexMapping(),
- indexCastMapping.getCastMapping(),
- partitionPair,
- formatDiscover
- .discover(formatIdentifier)
- .createReaderFactory(
- projectedRowType,
dataFilters));
- });
-
- final BinaryRow partition = split.partition();
- suppliers.add(
- () ->
- new FileRecordReader(
- bulkFormatMapping.getReaderFactory(),
- new FormatReaderContext(
- fileIO,
-
dataFilePathFactory.toPath(file.fileName()),
- file.fileSize()),
- bulkFormatMapping.getIndexMapping(),
- bulkFormatMapping.getCastMapping(),
- PartitionUtils.create(
-
bulkFormatMapping.getPartitionPair(), partition)));
- }
-
- return ConcatRecordReader.create(suppliers);
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index a0d863371..b3361b0df 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -59,7 +59,7 @@ import static
org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow> {
private final FileIO fileIO;
- private final AppendOnlyFileStoreRead read;
+ private final RawFileSplitRead read;
private final long schemaId;
private final RowType rowType;
private final FileFormat fileFormat;
@@ -81,7 +81,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
public AppendOnlyFileStoreWrite(
FileIO fileIO,
- AppendOnlyFileStoreRead read,
+ RawFileSplitRead read,
long schemaId,
String commitUser,
RowType rowType,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
similarity index 94%
rename from
paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
rename to
paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index b0ab4338f..d457c9093 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -64,8 +64,13 @@ import static
org.apache.paimon.io.DataFilePathFactory.CHANGELOG_FILE_PREFIX;
import static org.apache.paimon.predicate.PredicateBuilder.containsFields;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-/** {@link FileStoreRead} implementation for {@link KeyValueFileStore}. */
-public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
+/**
+ * An implementation for {@link KeyValueFileStore}, this class handle LSM
merging and changelog row
+ * kind things, it will force reading fields such as sequence and row_kind.
+ *
+ * @see RawFileSplitRead If in batch mode and reading raw files, it is
recommended to use this read.
+ */
+public class MergeFileSplitRead implements SplitRead<KeyValue> {
private final TableSchema tableSchema;
private final FileIO fileIO;
@@ -86,7 +91,7 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
private boolean forceKeepDelete = false;
- public KeyValueFileStoreRead(
+ public MergeFileSplitRead(
CoreOptions options,
TableSchema schema,
RowType keyType,
@@ -105,13 +110,13 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
this.sequenceFields = options.sequenceField();
}
- public KeyValueFileStoreRead withKeyProjection(@Nullable int[][]
projectedFields) {
+ public MergeFileSplitRead withKeyProjection(@Nullable int[][]
projectedFields) {
readerFactoryBuilder.withKeyProjection(projectedFields);
this.keyProjectedFields = projectedFields;
return this;
}
- public KeyValueFileStoreRead withValueProjection(@Nullable int[][]
projectedFields) {
+ public MergeFileSplitRead withValueProjection(@Nullable int[][]
projectedFields) {
if (projectedFields == null) {
return this;
}
@@ -155,18 +160,22 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
return this;
}
- public KeyValueFileStoreRead withIOManager(IOManager ioManager) {
+ public MergeFileSplitRead withIOManager(IOManager ioManager) {
this.mergeSorter.setIOManager(ioManager);
return this;
}
- public KeyValueFileStoreRead forceKeepDelete() {
+ public MergeFileSplitRead forceKeepDelete() {
this.forceKeepDelete = true;
return this;
}
@Override
- public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
+ public MergeFileSplitRead withFilter(Predicate predicate) {
+ if (predicate == null) {
+ return this;
+ }
+
List<Predicate> allFilters = new ArrayList<>();
List<Predicate> pkFilters = null;
List<String> primaryKeys = tableSchema.trimmedPrimaryKeys();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
new file mode 100644
index 000000000..c801dcaa8
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.format.FileFormatDiscover;
+import org.apache.paimon.format.FormatKey;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.FileRecordReader;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.partition.PartitionUtils;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.IndexCastMapping;
+import org.apache.paimon.schema.SchemaEvolutionUtil;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BulkFormatMapping;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Projection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
+
+/** A {@link SplitRead} to read raw file directly from {@link DataSplit}. */
+public class RawFileSplitRead implements SplitRead<InternalRow> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RawFileSplitRead.class);
+
+ private final FileIO fileIO;
+ private final SchemaManager schemaManager;
+ private final TableSchema schema;
+ private final FileFormatDiscover formatDiscover;
+ private final FileStorePathFactory pathFactory;
+ private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
+
+ private int[][] projection;
+
+ @Nullable private List<Predicate> filters;
+
+ public RawFileSplitRead(
+ FileIO fileIO,
+ SchemaManager schemaManager,
+ TableSchema schema,
+ RowType rowType,
+ FileFormatDiscover formatDiscover,
+ FileStorePathFactory pathFactory) {
+ this.fileIO = fileIO;
+ this.schemaManager = schemaManager;
+ this.schema = schema;
+ this.formatDiscover = formatDiscover;
+ this.pathFactory = pathFactory;
+ this.bulkFormatMappings = new HashMap<>();
+
+ this.projection = Projection.range(0,
rowType.getFieldCount()).toNestedIndexes();
+ }
+
+ public RawFileSplitRead withProjection(int[][] projectedFields) {
+ if (projectedFields != null) {
+ projection = projectedFields;
+ }
+ return this;
+ }
+
+ @Override
+ public RawFileSplitRead withFilter(Predicate predicate) {
+ if (predicate != null) {
+ this.filters = splitAnd(predicate);
+ }
+ return this;
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(DataSplit split) throws
IOException {
+ DataFilePathFactory dataFilePathFactory =
+ pathFactory.createDataFilePathFactory(split.partition(),
split.bucket());
+ List<ConcatRecordReader.ReaderSupplier<InternalRow>> suppliers = new
ArrayList<>();
+ if (split.beforeFiles().size() > 0) {
+ LOG.info("Ignore split before files: " + split.beforeFiles());
+ }
+
+ DeletionVector.Factory dvFactory =
+ DeletionVector.factory(
+ fileIO, split.dataFiles(),
split.deletionFiles().orElse(null));
+
+ for (DataFileMeta file : split.dataFiles()) {
+ String formatIdentifier =
DataFilePathFactory.formatIdentifier(file.fileName());
+ BulkFormatMapping bulkFormatMapping =
+ bulkFormatMappings.computeIfAbsent(
+ new FormatKey(file.schemaId(), formatIdentifier),
+ this::createBulkFormatMapping);
+
+ BinaryRow partition = split.partition();
+ suppliers.add(
+ () ->
+ createFileReader(
+ partition,
+ file,
+ dataFilePathFactory,
+ bulkFormatMapping,
+ dvFactory));
+ }
+
+ return ConcatRecordReader.create(suppliers);
+ }
+
+ private BulkFormatMapping createBulkFormatMapping(FormatKey key) {
+ TableSchema tableSchema = schema;
+ TableSchema dataSchema =
+ key.schemaId == schema.id() ? schema :
schemaManager.schema(key.schemaId);
+
+ // projection to data schema
+ int[][] dataProjection =
+ SchemaEvolutionUtil.createDataProjection(
+ tableSchema.fields(), dataSchema.fields(), projection);
+
+ IndexCastMapping indexCastMapping =
+ SchemaEvolutionUtil.createIndexCastMapping(
+ Projection.of(projection).toTopLevelIndexes(),
+ tableSchema.fields(),
+ Projection.of(dataProjection).toTopLevelIndexes(),
+ dataSchema.fields());
+
+ List<Predicate> dataFilters =
+ this.schema.id() == key.schemaId
+ ? filters
+ : SchemaEvolutionUtil.createDataFilters(
+ tableSchema.fields(), dataSchema.fields(),
filters);
+
+ Pair<int[], RowType> partitionPair = null;
+ if (!dataSchema.partitionKeys().isEmpty()) {
+ Pair<int[], int[][]> partitionMapping =
+ PartitionUtils.constructPartitionMapping(dataSchema,
dataProjection);
+ // if partition fields are not selected, we just do nothing
+ if (partitionMapping != null) {
+ dataProjection = partitionMapping.getRight();
+ partitionPair =
+ Pair.of(
+ partitionMapping.getLeft(),
+
dataSchema.projectedLogicalRowType(dataSchema.partitionKeys()));
+ }
+ }
+
+ RowType projectedRowType =
+
Projection.of(dataProjection).project(dataSchema.logicalRowType());
+
+ return new BulkFormatMapping(
+ indexCastMapping.getIndexMapping(),
+ indexCastMapping.getCastMapping(),
+ partitionPair,
+ formatDiscover
+ .discover(key.format)
+ .createReaderFactory(projectedRowType, dataFilters));
+ }
+
+ private RecordReader<InternalRow> createFileReader(
+ BinaryRow partition,
+ DataFileMeta file,
+ DataFilePathFactory dataFilePathFactory,
+ BulkFormatMapping bulkFormatMapping,
+ DeletionVector.Factory dvFactory)
+ throws IOException {
+ FileRecordReader fileRecordReader =
+ new FileRecordReader(
+ bulkFormatMapping.getReaderFactory(),
+ new FormatReaderContext(
+ fileIO,
+ dataFilePathFactory.toPath(file.fileName()),
+ file.fileSize()),
+ bulkFormatMapping.getIndexMapping(),
+ bulkFormatMapping.getCastMapping(),
+
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
+
+ Optional<DeletionVector> deletionVector =
dvFactory.create(file.fileName());
+ if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
+ return new ApplyDeletionVectorReader<>(fileRecordReader,
deletionVector.get());
+ }
+ return fileRecordReader;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
similarity index 93%
rename from
paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java
rename to paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
index 2d3e121b1..a5b05e8f4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
@@ -29,9 +29,9 @@ import java.io.IOException;
*
* @param <T> type of record to read.
*/
-public interface FileStoreRead<T> {
+public interface SplitRead<T> {
- FileStoreRead<T> withFilter(Predicate predicate);
+ SplitRead<T> withFilter(Predicate predicate);
/** Create a {@link RecordReader} from split. */
RecordReader<T> createReader(DataSplit split) throws IOException;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 2eb41fdd5..810669f2e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -24,11 +24,11 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
-import org.apache.paimon.operation.AppendOnlyFileStoreRead;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.Lock;
+import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
@@ -112,8 +112,14 @@ class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
@Override
public InnerTableRead newRead() {
- AppendOnlyFileStoreRead read = store().newRead();
- return new AbstractDataTableRead<InternalRow>(read, schema()) {
+ RawFileSplitRead read = store().newRead();
+ return new AbstractDataTableRead<InternalRow>(schema()) {
+
+ @Override
+ protected InnerTableRead innerWithFilter(Predicate predicate) {
+ read.withFilter(predicate);
+ return this;
+ }
@Override
public void projection(int[][] projection) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index f2ab323d2..bf26ec31c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -32,7 +32,6 @@ import org.apache.paimon.operation.KeyValueFileStoreScan;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.query.LocalTableQuery;
@@ -42,7 +41,6 @@ import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.MergeTreeSplitGenerator;
import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
@@ -157,25 +155,8 @@ class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
@Override
public InnerTableRead newRead() {
- return new KeyValueTableRead(store().newRead(), schema()) {
-
- @Override
- public void projection(int[][] projection) {
- read.withValueProjection(projection);
- }
-
- @Override
- protected RecordReader.RecordIterator<InternalRow>
rowDataRecordIteratorFromKv(
- RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
- return new ValueContentRowDataRecordIterator(kvRecordIterator);
- }
-
- @Override
- public InnerTableRead forceKeepDelete() {
- read.forceKeepDelete();
- return this;
- }
- };
+ return new KeyValueTableRead(
+ () -> store().newRead(), () -> store().newBatchRawFileRead(),
schema());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index 930cddcd5..bdb548d61 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table.source;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.operation.DefaultValueAssigner;
-import org.apache.paimon.operation.FileStoreRead;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateProjectionConverter;
import org.apache.paimon.reader.RecordReader;
@@ -34,15 +33,13 @@ import java.util.Optional;
/** A {@link InnerTableRead} for data table. */
public abstract class AbstractDataTableRead<T> implements InnerTableRead {
- private final FileStoreRead<T> fileStoreRead;
private final DefaultValueAssigner defaultValueAssigner;
private int[][] projection;
private boolean executeFilter = false;
private Predicate predicate;
- public AbstractDataTableRead(FileStoreRead<T> fileStoreRead, TableSchema
schema) {
- this.fileStoreRead = fileStoreRead;
+ public AbstractDataTableRead(TableSchema schema) {
this.defaultValueAssigner = schema == null ? null :
DefaultValueAssigner.create(schema);
}
@@ -61,10 +58,11 @@ public abstract class AbstractDataTableRead<T> implements
InnerTableRead {
if (defaultValueAssigner != null) {
predicate = defaultValueAssigner.handlePredicate(predicate);
}
- fileStoreRead.withFilter(predicate);
- return this;
+ return innerWithFilter(predicate);
}
+ protected abstract InnerTableRead innerWithFilter(Predicate predicate);
+
@Override
public TableRead executeFilter() {
this.executeFilter = true;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
index bb0354eee..f3f66c4aa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java
@@ -21,61 +21,120 @@ package org.apache.paimon.table.source;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.operation.KeyValueFileStoreRead;
+import org.apache.paimon.operation.MergeFileSplitRead;
+import org.apache.paimon.operation.RawFileSplitRead;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.LazyField;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.function.Supplier;
/**
- * An abstraction layer above {@link KeyValueFileStoreRead} to provide reading
of {@link
- * InternalRow}.
+ * An abstraction layer above {@link MergeFileSplitRead} to provide reading of
{@link InternalRow}.
*/
-public abstract class KeyValueTableRead extends
AbstractDataTableRead<KeyValue> {
+public final class KeyValueTableRead extends AbstractDataTableRead<KeyValue> {
- protected final KeyValueFileStoreRead read;
+ private final LazyField<MergeFileSplitRead> mergeRead;
+ private final LazyField<RawFileSplitRead> batchRawRead;
- protected KeyValueTableRead(KeyValueFileStoreRead read, TableSchema
schema) {
- super(read, schema);
- // We don't need any key fields, the columns that need to be read are
already included in
- // the value
- this.read = read.withKeyProjection(new int[0][]);
- }
+ private int[][] projection = null;
+ private boolean forceKeepDelete = false;
+ private Predicate predicate = null;
+ private IOManager ioManager = null;
- @Override
- public TableRead withIOManager(IOManager ioManager) {
- read.withIOManager(ioManager);
- return this;
+ public KeyValueTableRead(
+ Supplier<MergeFileSplitRead> mergeReadSupplier,
+ Supplier<RawFileSplitRead> batchRawReadSupplier,
+ TableSchema schema) {
+ super(schema);
+ this.mergeRead = new LazyField<>(() ->
createMergeRead(mergeReadSupplier));
+ this.batchRawRead = new LazyField<>(() ->
createBatchRawRead(batchRawReadSupplier));
}
- @Override
- public final RecordReader<InternalRow> reader(Split split) throws
IOException {
- return new RowDataRecordReader(read.createReader((DataSplit) split));
+ private MergeFileSplitRead createMergeRead(Supplier<MergeFileSplitRead>
readSupplier) {
+ MergeFileSplitRead read =
+ readSupplier
+ .get()
+ .withKeyProjection(new int[0][])
+ .withValueProjection(projection)
+ .withFilter(predicate)
+ .withIOManager(ioManager);
+ if (forceKeepDelete) {
+ read = read.forceKeepDelete();
+ }
+ return read;
}
- protected abstract RecordReader.RecordIterator<InternalRow>
rowDataRecordIteratorFromKv(
- RecordReader.RecordIterator<KeyValue> kvRecordIterator);
+ private RawFileSplitRead createBatchRawRead(Supplier<RawFileSplitRead>
readSupplier) {
+ return
readSupplier.get().withProjection(projection).withFilter(predicate);
+ }
- private class RowDataRecordReader implements RecordReader<InternalRow> {
+ @Override
+ public void projection(int[][] projection) {
+ if (mergeRead.initialized()) {
+ mergeRead.get().withValueProjection(projection);
+ }
+ if (batchRawRead.initialized()) {
+ batchRawRead.get().withProjection(projection);
+ }
+ this.projection = projection;
+ }
- private final RecordReader<KeyValue> wrapped;
+ @Override
+ public InnerTableRead forceKeepDelete() {
+ if (mergeRead.initialized()) {
+ mergeRead.get().forceKeepDelete();
+ }
+ this.forceKeepDelete = true;
+ return this;
+ }
- private RowDataRecordReader(RecordReader<KeyValue> wrapped) {
- this.wrapped = wrapped;
+ @Override
+ protected InnerTableRead innerWithFilter(Predicate predicate) {
+ if (mergeRead.initialized()) {
+ mergeRead.get().withFilter(predicate);
}
+ if (batchRawRead.initialized()) {
+ batchRawRead.get().withFilter(predicate);
+ }
+ this.predicate = predicate;
+ return this;
+ }
- @Nullable
- @Override
- public RecordIterator<InternalRow> readBatch() throws IOException {
- RecordIterator<KeyValue> batch = wrapped.readBatch();
- return batch == null ? null : rowDataRecordIteratorFromKv(batch);
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ if (mergeRead.initialized()) {
+ mergeRead.get().withIOManager(ioManager);
}
+ this.ioManager = ioManager;
+ return this;
+ }
- @Override
- public void close() throws IOException {
- wrapped.close();
+ @Override
+ public RecordReader<InternalRow> reader(Split split) throws IOException {
+ DataSplit dataSplit = (DataSplit) split;
+ if (!forceKeepDelete && !dataSplit.isStreaming() &&
split.convertToRawFiles().isPresent()) {
+ return batchRawRead.get().createReader(dataSplit);
}
+
+ RecordReader<KeyValue> reader =
mergeRead.get().createReader(dataSplit);
+ return new RecordReader<InternalRow>() {
+
+ @Nullable
+ @Override
+ public RecordIterator<InternalRow> readBatch() throws IOException {
+ RecordIterator<KeyValue> batch = reader.readBatch();
+ return batch == null ? null : new
ValueContentRowDataRecordIterator(batch);
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ };
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
index 72b54ae6f..1b2c6299b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
@@ -22,7 +22,7 @@ import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
-import org.apache.paimon.operation.FileStoreRead;
+import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.reader.RecordReader;
import java.io.IOException;
@@ -30,7 +30,7 @@ import java.util.ArrayList;
import java.util.List;
/**
- * An abstraction layer above {@link FileStoreRead} to provide reading of
{@link InternalRow}.
+ * An abstraction layer above {@link SplitRead} to provide reading of {@link
InternalRow}.
*
* @since 0.4.0
*/
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index b20f89b94..6adc3aff0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -36,9 +36,9 @@ import
org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreCommitImpl;
-import org.apache.paimon.operation.FileStoreRead;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.Lock;
+import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReaderIterator;
@@ -426,7 +426,7 @@ public class TestFileStore extends KeyValueFileStore {
}
List<KeyValue> kvs = new ArrayList<>();
- FileStoreRead<KeyValue> read = newRead();
+ SplitRead<KeyValue> read = newRead();
for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>>
entryWithPartition :
filesPerPartitionAndBucket.entrySet()) {
for (Map.Entry<Integer, List<DataFileMeta>> entryWithBucket :
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
similarity index 99%
rename from
paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
index c46482556..806c869f9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
@@ -65,8 +65,8 @@ import java.util.stream.Stream;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link KeyValueFileStoreRead}. */
-public class KeyValueFileStoreReadTest {
+/** Tests for {@link MergeFileSplitRead}. */
+public class MergeFileSplitReadTest {
@TempDir java.nio.file.Path tempDir;
@@ -226,7 +226,7 @@ public class KeyValueFileStoreReadTest {
Map<BinaryRow, List<ManifestEntry>> filesGroupedByPartition =
scan.withSnapshot(snapshotId).plan().files().stream()
.collect(Collectors.groupingBy(ManifestEntry::partition));
- KeyValueFileStoreRead read = store.newRead();
+ MergeFileSplitRead read = store.newRead();
if (keyProjection != null) {
read.withKeyProjection(keyProjection);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index 523033579..99af52ce6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -124,9 +124,9 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
expected,
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
- changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert",
"02-27"),
- changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert",
"02-27"),
- changelogRow("+U", 7, "Seven", "matched_upsert",
"02-28"),
+ changelogRow("+I", 2, "v_2_nmu", "not_matched_upsert",
"02-27"),
+ changelogRow("+I", 3, "v_3_nmu", "not_matched_upsert",
"02-27"),
+ changelogRow("+I", 7, "Seven", "matched_upsert",
"02-28"),
changelogRow("+I", 8, "v_8", "insert", "02-29"),
changelogRow("+I", 11, "v_11", "insert", "02-29"),
changelogRow("+I", 12, "v_12", "insert", "02-29")));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index d51aefb96..7fd14b1ad 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
@@ -32,16 +33,15 @@ import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.paimon.operation.KeyValueFileStoreRead;
import org.apache.paimon.operation.KeyValueFileStoreWrite;
+import org.apache.paimon.operation.MergeFileSplitRead;
+import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.options.Options;
-import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
@@ -60,7 +60,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.function.Function;
import static java.util.Collections.singletonList;
@@ -110,20 +109,13 @@ public class TestChangelogDataReadWrite {
}
public TableRead createReadWithKey() {
- return createRead(ValueContentRowDataRecordIterator::new);
- }
-
- private TableRead createRead(
- Function<
- RecordReader.RecordIterator<KeyValue>,
- RecordReader.RecordIterator<InternalRow>>
- rowDataIteratorCreator) {
SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
tablePath);
CoreOptions options = new CoreOptions(new HashMap<>());
- KeyValueFileStoreRead read =
- new KeyValueFileStoreRead(
+ TableSchema schema = schemaManager.schema(0);
+ MergeFileSplitRead read =
+ new MergeFileSplitRead(
options,
- schemaManager.schema(0),
+ schema,
KEY_TYPE,
VALUE_TYPE,
COMPARATOR,
@@ -131,26 +123,22 @@ public class TestChangelogDataReadWrite {
KeyValueFileReaderFactory.builder(
LocalFileIO.create(),
schemaManager,
- schemaManager.schema(0),
+ schema,
KEY_TYPE,
VALUE_TYPE,
ignore -> avro,
pathFactory,
EXTRACTOR,
options));
- return new KeyValueTableRead(read, null) {
-
- @Override
- public void projection(int[][] projection) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected RecordReader.RecordIterator<InternalRow>
rowDataRecordIteratorFromKv(
- RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
- return rowDataIteratorCreator.apply(kvRecordIterator);
- }
- };
+ RawFileSplitRead rawFileRead =
+ new RawFileSplitRead(
+ LocalFileIO.create(),
+ schemaManager,
+ schema,
+ VALUE_TYPE,
+ FileFormatDiscover.of(options),
+ pathFactory);
+ return new KeyValueTableRead(() -> read, () -> rawFileRead, null);
}
public List<DataFileMeta> writeFiles(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index 2303ba1bc..fc7e05d75 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import javax.annotation.Nullable;
@@ -261,15 +262,28 @@ public class ReadWriteTableTestUtil {
CloseableIterator<Row> resultItr = bEnv.executeSql(query).collect();
try (BlockingIterator<Row, Row> iterator =
BlockingIterator.of(resultItr)) {
if (!expected.isEmpty()) {
- assertThat(
- iterator.collect(
- expected.size(), TIME_OUT.getSize(),
TIME_OUT.getUnit()))
- .containsExactlyInAnyOrderElementsOf(expected);
+ List<Row> result =
+ iterator.collect(expected.size(), TIME_OUT.getSize(),
TIME_OUT.getUnit());
+ assertThat(toInsertOnlyRows(result))
+
.containsExactlyInAnyOrderElementsOf(toInsertOnlyRows(expected));
}
assertThat(resultItr.hasNext()).isFalse();
}
}
+ private static List<Row> toInsertOnlyRows(List<Row> rows) {
+ List<Row> result = new ArrayList<>();
+ for (Row row : rows) {
+ assertThat(row.getKind()).isIn(RowKind.INSERT,
RowKind.UPDATE_AFTER);
+ Row newRow = new Row(row.getArity());
+ for (int i = 0; i < row.getArity(); i++) {
+ newRow.setField(i, row.getField(i));
+ }
+ result.add(newRow);
+ }
+ return result;
+ }
+
public static BlockingIterator<Row, Row> testStreamingRead(String query,
List<Row> expected)
throws Exception {
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());