This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 893ce76be [bug] Fix sort-spill-threshold with projection and refactor
Default values (#1572)
893ce76be is described below
commit 893ce76bee20c6226a47d2631d4cedaf7e514114
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jul 17 13:18:10 2023 +0800
[bug] Fix sort-spill-threshold with projection and refactor Default values
(#1572)
---
.../main/java/org/apache/paimon/CoreOptions.java | 12 +--
.../org/apache/paimon/casting/DefaultValueRow.java | 7 +-
.../paimon/io/KeyValueFileReaderFactory.java | 4 +
.../org/apache/paimon/mergetree/MergeSorter.java | 6 +-
...ValueAssiger.java => DefaultValueAssigner.java} | 103 +++++++++++----------
.../paimon/operation/KeyValueFileStoreRead.java | 1 +
.../org/apache/paimon/schema/SchemaValidation.java | 2 +-
.../paimon/table/AbstractFileStoreTable.java | 32 ++++---
.../paimon/table/AppendOnlyFileStoreTable.java | 7 ++
.../table/source/InnerStreamTableScanImpl.java | 11 +--
.../paimon/table/source/InnerTableScanImpl.java | 12 +--
.../org/apache/paimon/table/source/TableRead.java | 4 +-
.../table/source/snapshot/SnapshotReaderImpl.java | 11 +--
.../apache/paimon/table/system/AuditLogTable.java | 8 ++
.../apache/paimon/table/system/BucketsTable.java | 7 ++
.../apache/paimon/table/system/ConsumersTable.java | 6 ++
.../org/apache/paimon/table/system/FilesTable.java | 7 ++
.../apache/paimon/table/system/ManifestsTable.java | 7 ++
.../apache/paimon/table/system/OptionsTable.java | 7 ++
.../apache/paimon/table/system/SchemasTable.java | 6 ++
.../apache/paimon/table/system/SnapshotsTable.java | 7 ++
.../org/apache/paimon/table/system/TagsTable.java | 7 ++
...igerTest.java => DefaultValueAssignerTest.java} | 22 ++---
.../apache/paimon/flink/BatchFileStoreITCase.java | 23 +++--
24 files changed, 207 insertions(+), 112 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index c979a19c0..4d16f7554 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1101,18 +1101,18 @@ public class CoreOptions implements Serializable {
return options.get(METASTORE_PARTITIONED_TABLE);
}
- public Options getFieldDefaultValues() {
- Map<String, String> defultValues = new HashMap<>();
+ public Map<String, String> getFieldDefaultValues() {
+ Map<String, String> defaultValues = new HashMap<>();
+ String fieldPrefix = FIELDS_PREFIX + ".";
+ String defaultValueSuffix = "." + DEFAULT_VALUE_SUFFIX;
for (Map.Entry<String, String> option : options.toMap().entrySet()) {
String key = option.getKey();
- String fieldPrefix = FIELDS_PREFIX + ".";
- String defaultValueSuffix = "." + DEFAULT_VALUE_SUFFIX;
if (key != null && key.startsWith(fieldPrefix) &&
key.endsWith(defaultValueSuffix)) {
String fieldName = key.replace(fieldPrefix,
"").replace(defaultValueSuffix, "");
- defultValues.put(fieldName, option.getValue());
+ defaultValues.put(fieldName, option.getValue());
}
}
- return new Options(defultValues);
+ return defaultValues;
}
public List<CommitCallback> commitCallbacks() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
b/paimon-core/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
index 10f2e5cd1..60dae11a7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
+++ b/paimon-core/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
@@ -31,9 +31,10 @@ import org.apache.paimon.types.RowKind;
* InternalRow}.
*/
public class DefaultValueRow implements InternalRow {
+
private InternalRow row;
- private InternalRow defaultValueRow;
+ private final InternalRow defaultValueRow;
private DefaultValueRow(InternalRow defaultValueRow) {
this.defaultValueRow = defaultValueRow;
@@ -44,6 +45,10 @@ public class DefaultValueRow implements InternalRow {
return this;
}
+ public InternalRow defaultValueRow() {
+ return defaultValueRow;
+ }
+
@Override
public int getFieldCount() {
return row.getFieldCount();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 75de5709d..e9413715f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -169,6 +169,10 @@ public class KeyValueFileReaderFactory {
return this;
}
+ public RowType projectedValueType() {
+ return projectedValueType;
+ }
+
public KeyValueFileReaderFactory build(BinaryRow partition, int
bucket) {
return build(partition, bucket, true, Collections.emptyList());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
index 9f5b34e12..a63c1d21c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
@@ -67,7 +67,7 @@ import static
org.apache.paimon.schema.SystemColumns.VALUE_KIND;
public class MergeSorter {
private final RowType keyType;
- private final RowType valueType;
+ private RowType valueType;
private final SortEngine sortEngine;
private final int spillThreshold;
@@ -100,6 +100,10 @@ public class MergeSorter {
this.ioManager = ioManager;
}
+ public void setProjectedValueType(RowType projectedType) {
+ this.valueType = projectedType;
+ }
+
public <T> RecordReader<T> mergeSort(
List<ReaderSupplier<KeyValue>> lazyReaders,
Comparator<InternalRow> keyComparator,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssiger.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
similarity index 57%
rename from
paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssiger.java
rename to
paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
index adaa645fe..730aafe8c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssiger.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
@@ -19,6 +19,7 @@
package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.casting.DefaultValueRow;
@@ -41,86 +42,82 @@ import java.util.Map;
import java.util.Optional;
/**
- * the field Default value assigner. note that the invoke of assigning should
be after merge and
- * schema evolution
+ * The field Default value assigner. note that invoke of assigning should be
after merge and schema
+ * evolution.
*/
-public class DefaultValueAssiger {
+public class DefaultValueAssigner {
- private GenericRow defaultValueMapping;
- private TableSchema tableSchema;
+ private final RowType rowType;
+ private final Map<String, String> defaultValues;
- private Map<String, String> defaultValues;
+ private boolean needToAssign;
private int[][] project;
+ private DefaultValueRow defaultValueRow;
- private boolean isCacheDefaultMapping;
-
- public DefaultValueAssiger(TableSchema tableSchema) {
- this.tableSchema = tableSchema;
-
- CoreOptions coreOptions = new CoreOptions(tableSchema.options());
- defaultValues = coreOptions.getFieldDefaultValues().toMap();
+ private DefaultValueAssigner(Map<String, String> defaultValues, RowType
rowType) {
+ this.defaultValues = defaultValues;
+ this.needToAssign = defaultValues.size() > 0;
+ this.rowType = rowType;
}
- public DefaultValueAssiger handleProject(int[][] project) {
+ public DefaultValueAssigner handleProject(int[][] project) {
this.project = project;
+ if (project != null) {
+ List<String> projected =
Projection.of(project).project(rowType).getFieldNames();
+ needToAssign =
defaultValues.keySet().stream().anyMatch(projected::contains);
+ }
return this;
}
- /** assign default value for colomn which value is null. */
+ public boolean needToAssign() {
+ return needToAssign;
+ }
+
+ /** assign default value for column which value is null. */
public RecordReader<InternalRow>
assignFieldsDefaultValue(RecordReader<InternalRow> reader) {
- if (defaultValues.isEmpty()) {
+ if (!needToAssign) {
return reader;
}
- if (!isCacheDefaultMapping) {
- isCacheDefaultMapping = true;
- this.defaultValueMapping = createDefaultValueMapping();
+ if (defaultValueRow == null) {
+ defaultValueRow = createDefaultValueRow();
}
- RecordReader<InternalRow> result = reader;
- if (defaultValueMapping != null) {
- DefaultValueRow defaultValueRow =
DefaultValueRow.from(defaultValueMapping);
- result = reader.transform(defaultValueRow::replaceRow);
- }
- return result;
+ return reader.transform(defaultValueRow::replaceRow);
}
- GenericRow createDefaultValueMapping() {
-
- RowType valueType = tableSchema.logicalRowType();
-
+ @VisibleForTesting
+ DefaultValueRow createDefaultValueRow() {
List<DataField> fields;
if (project != null) {
- fields = Projection.of(project).project(valueType).getFields();
+ fields = Projection.of(project).project(rowType).getFields();
} else {
- fields = valueType.getFields();
+ fields = rowType.getFields();
}
- GenericRow defaultValuesMa = null;
- if (!fields.isEmpty()) {
- defaultValuesMa = new GenericRow(fields.size());
- for (int i = 0; i < fields.size(); i++) {
- DataField dataField = fields.get(i);
- String defaultValueStr = defaultValues.get(dataField.name());
- if (defaultValueStr == null) {
- continue;
- }
+ GenericRow row = new GenericRow(fields.size());
+ for (int i = 0; i < fields.size(); i++) {
+ DataField dataField = fields.get(i);
+ String defaultValueStr = defaultValues.get(dataField.name());
+ if (defaultValueStr == null) {
+ continue;
+ }
- CastExecutor<Object, Object> resolve =
- (CastExecutor<Object, Object>)
- CastExecutors.resolve(VarCharType.STRING_TYPE,
dataField.type());
+ @SuppressWarnings("unchecked")
+ CastExecutor<Object, Object> resolve =
+ (CastExecutor<Object, Object>)
+ CastExecutors.resolve(VarCharType.STRING_TYPE,
dataField.type());
- if (resolve == null) {
- throw new RuntimeException(
- "Default value do not support the type of " +
dataField.type());
- }
- Object defaultValue =
resolve.cast(BinaryString.fromString(defaultValueStr));
- defaultValuesMa.setField(i, defaultValue);
+ if (resolve == null) {
+ throw new RuntimeException(
+ "Default value do not support the type of " +
dataField.type());
}
+ Object defaultValue =
resolve.cast(BinaryString.fromString(defaultValueStr));
+ row.setField(i, defaultValue);
}
- return defaultValuesMa;
+ return DefaultValueRow.from(row);
}
public Predicate handlePredicate(Predicate filters) {
@@ -154,4 +151,10 @@ public class DefaultValueAssiger {
}
return result;
}
+
+ public static DefaultValueAssigner create(TableSchema schema) {
+ CoreOptions coreOptions = new CoreOptions(schema.options());
+ Map<String, String> defaultValues =
coreOptions.getFieldDefaultValues();
+ return new DefaultValueAssigner(defaultValues,
schema.logicalRowType());
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index 3a666e709..0a2a31dcf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -127,6 +127,7 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
this.outerProjection = projection.outerProjection;
if (pushdownProjection != null) {
readerFactoryBuilder.withValueProjection(pushdownProjection);
+
mergeSorter.setProjectedValueType(readerFactoryBuilder.projectedValueType());
}
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index b04554a83..df2d2c11a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -297,7 +297,7 @@ public class SchemaValidation {
private static void validateDefaultValues(TableSchema schema) {
CoreOptions coreOptions = new CoreOptions(schema.options());
- Map<String, String> defaultValues =
coreOptions.getFieldDefaultValues().toMap();
+ Map<String, String> defaultValues =
coreOptions.getFieldDefaultValues();
if (!defaultValues.isEmpty()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 5b042b228..3dfeabc8b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -22,11 +22,12 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.AddPartitionCommitCallback;
import org.apache.paimon.metastore.MetastoreClient;
-import org.apache.paimon.operation.DefaultValueAssiger;
+import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
@@ -48,6 +49,7 @@ import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.InnerTableScanImpl;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
@@ -126,7 +128,7 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
snapshotManager(),
splitGenerator(),
nonPartitionFilterConsumer(),
- new DefaultValueAssiger(tableSchema));
+ DefaultValueAssigner.create(tableSchema));
}
@Override
@@ -135,7 +137,7 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
coreOptions(),
newSnapshotReader(),
snapshotManager(),
- new DefaultValueAssiger(tableSchema));
+ DefaultValueAssigner.create(tableSchema));
}
@Override
@@ -145,7 +147,7 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
newSnapshotReader(),
snapshotManager(),
supportStreamingReadOverwrite(),
- new DefaultValueAssiger(tableSchema));
+ DefaultValueAssigner.create(tableSchema));
}
public abstract SplitGenerator splitGenerator();
@@ -319,27 +321,35 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public InnerTableRead newRead() {
InnerTableRead innerTableRead = innerRead();
- DefaultValueAssiger defaultValueAssiger = new
DefaultValueAssiger(tableSchema);
+ DefaultValueAssigner defaultValueAssigner =
DefaultValueAssigner.create(tableSchema);
+ if (!defaultValueAssigner.needToAssign()) {
+ return innerTableRead;
+ }
+
return new InnerTableRead() {
@Override
public InnerTableRead withFilter(Predicate predicate) {
-
innerTableRead.withFilter(defaultValueAssiger.handlePredicate(predicate));
+
innerTableRead.withFilter(defaultValueAssigner.handlePredicate(predicate));
return this;
}
@Override
public InnerTableRead withProjection(int[][] projection) {
- defaultValueAssiger.handleProject(projection);
+ defaultValueAssigner.handleProject(projection);
innerTableRead.withProjection(projection);
return this;
}
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ innerTableRead.withIOManager(ioManager);
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
- RecordReader<InternalRow> reader =
- defaultValueAssiger.assignFieldsDefaultValue(
- innerTableRead.createReader(split));
- return reader;
+ return defaultValueAssigner.assignFieldsDefaultValue(
+ innerTableRead.createReader(split));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 32f4a160a..787dbeff1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -22,6 +22,7 @@ import org.apache.paimon.AppendOnlyFileStore;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.WriteMode;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
@@ -40,6 +41,7 @@ import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.Preconditions;
@@ -128,6 +130,11 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
return this;
}
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
return read.createReader((DataSplit) split);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 9b50e7831..3040ee5b7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -21,7 +21,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.Consumer;
-import org.apache.paimon.operation.DefaultValueAssiger;
+import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
@@ -51,6 +51,7 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
private final CoreOptions options;
private final SnapshotManager snapshotManager;
private final boolean supportStreamingReadOverwrite;
+ private final DefaultValueAssigner defaultValueAssigner;
private StartingScanner startingScanner;
private FollowUpScanner followUpScanner;
@@ -59,24 +60,22 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
@Nullable private Long currentWatermark;
@Nullable private Long nextSnapshotId;
- private DefaultValueAssiger defaultValueAssiger;
-
public InnerStreamTableScanImpl(
CoreOptions options,
SnapshotReader snapshotReader,
SnapshotManager snapshotManager,
boolean supportStreamingReadOverwrite,
- DefaultValueAssiger defaultValueAssiger) {
+ DefaultValueAssigner defaultValueAssigner) {
super(options, snapshotReader);
this.options = options;
this.snapshotManager = snapshotManager;
this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
- this.defaultValueAssiger = defaultValueAssiger;
+ this.defaultValueAssigner = defaultValueAssigner;
}
@Override
public InnerStreamTableScanImpl withFilter(Predicate predicate) {
-
snapshotReader.withFilter(defaultValueAssiger.handlePredicate(predicate));
+
snapshotReader.withFilter(defaultValueAssigner.handlePredicate(predicate));
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index caa8fbaf3..ea19a1ab9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -19,7 +19,7 @@
package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.DefaultValueAssiger;
+import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
@@ -29,27 +29,25 @@ import org.apache.paimon.utils.SnapshotManager;
public class InnerTableScanImpl extends AbstractInnerTableScan {
private final SnapshotManager snapshotManager;
+ private final DefaultValueAssigner defaultValueAssigner;
private StartingScanner startingScanner;
-
private boolean hasNext;
- private DefaultValueAssiger defaultValueAssiger;
-
public InnerTableScanImpl(
CoreOptions options,
SnapshotReader snapshotReader,
SnapshotManager snapshotManager,
- DefaultValueAssiger defaultValueAssiger) {
+ DefaultValueAssigner defaultValueAssigner) {
super(options, snapshotReader);
this.snapshotManager = snapshotManager;
this.hasNext = true;
- this.defaultValueAssiger = defaultValueAssiger;
+ this.defaultValueAssigner = defaultValueAssigner;
}
@Override
public InnerTableScan withFilter(Predicate predicate) {
-
snapshotReader.withFilter(defaultValueAssiger.handlePredicate(predicate));
+
snapshotReader.withFilter(defaultValueAssigner.handlePredicate(predicate));
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
index 8ba6eed3b..adc8dbed9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
@@ -37,9 +37,7 @@ import java.util.List;
@Public
public interface TableRead {
- default TableRead withIOManager(IOManager ioManager) {
- return this;
- }
+ TableRead withIOManager(IOManager ioManager);
RecordReader<InternalRow> createReader(Split split) throws IOException;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 3c17bb51d..848c40175 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -28,7 +28,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.operation.DefaultValueAssiger;
+import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.Predicate;
@@ -67,12 +67,11 @@ public class SnapshotReaderImpl implements SnapshotReader {
private final ConsumerManager consumerManager;
private final SplitGenerator splitGenerator;
private final BiConsumer<FileStoreScan, Predicate>
nonPartitionFilterConsumer;
+ private final DefaultValueAssigner defaultValueAssigner;
private ScanKind scanKind = ScanKind.ALL;
private RecordComparator lazyPartitionComparator;
- private DefaultValueAssiger defaultValueAssiger;
-
public SnapshotReaderImpl(
FileStoreScan scan,
TableSchema tableSchema,
@@ -80,7 +79,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
SnapshotManager snapshotManager,
SplitGenerator splitGenerator,
BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer,
- DefaultValueAssiger defaultValueAssiger) {
+ DefaultValueAssigner defaultValueAssigner) {
this.scan = scan;
this.tableSchema = tableSchema;
this.options = options;
@@ -89,7 +88,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
new ConsumerManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
this.splitGenerator = splitGenerator;
this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
- this.defaultValueAssiger = defaultValueAssiger;
+ this.defaultValueAssigner = defaultValueAssigner;
}
@Override
@@ -130,7 +129,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
List<Predicate> partitionFilters = new ArrayList<>();
List<Predicate> nonPartitionFilters = new ArrayList<>();
for (Predicate p :
-
PredicateBuilder.splitAnd(defaultValueAssiger.handlePredicate(predicate))) {
+
PredicateBuilder.splitAnd(defaultValueAssigner.handlePredicate(predicate))) {
Optional<Predicate> mapped = transformFieldMapping(p,
fieldIdxToPartitionIdx);
if (mapped.isPresent()) {
partitionFilters.add(mapped.get());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 5750bb822..cc1f98f4b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -24,6 +24,7 @@ import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.ScanKind;
@@ -41,6 +42,7 @@ import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
@@ -382,6 +384,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ this.dataRead.withIOManager(ioManager);
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
return dataRead.createReader(split).transform(this::convertRow);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index c8aea53f5..971b97d3c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.system;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
@@ -35,6 +36,7 @@ import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
@@ -154,6 +156,11 @@ public class BucketsTable implements DataTable,
ReadonlyTable {
throw new UnsupportedOperationException("BucketsRead does not
support projection");
}
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
if (!(split instanceof DataSplit)) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
index 570dc4683..896acfccc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
@@ -22,6 +22,7 @@ import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
@@ -175,6 +176,11 @@ public class ConsumersTable implements ReadonlyTable {
return this;
}
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
if (!(split instanceof ConsumersTable.ConsumersSplit)) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 94858164f..c00900734 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.system;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.LazyGenericRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.format.FieldStats;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
@@ -39,6 +40,7 @@ import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
@@ -217,6 +219,11 @@ public class FilesTable implements ReadonlyTable {
return this;
}
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
if (!(split instanceof FilesSplit)) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
index bad1a374a..9357cd64f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -23,6 +23,7 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -36,6 +37,7 @@ import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
@@ -188,6 +190,11 @@ public class ManifestsTable implements ReadonlyTable {
return this;
}
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
if (!(split instanceof ManifestsSplit)) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
index a104a79c8..81dfa094c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.system;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
@@ -32,6 +33,7 @@ import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IteratorRecordReader;
@@ -169,6 +171,11 @@ public class OptionsTable implements ReadonlyTable {
return this;
}
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
if (!(split instanceof OptionsSplit)) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index 91d553755..eed7d4d7b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.system;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
@@ -181,6 +182,11 @@ public class SchemasTable implements ReadonlyTable {
return this;
}
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
if (!(split instanceof SchemasSplit)) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index b4333f221..0a73bbebd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
@@ -33,6 +34,7 @@ import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
@@ -192,6 +194,11 @@ public class SnapshotsTable implements ReadonlyTable {
return this;
}
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
if (!(split instanceof SnapshotsSplit)) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index a7b6ba447..eecea7d0e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
@@ -33,6 +34,7 @@ import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadOnceTableScan;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
@@ -178,6 +180,11 @@ public class TagsTable implements ReadonlyTable {
return this;
}
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ return this;
+ }
+
@Override
public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof TagsSplit)) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssigerTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
similarity index 86%
rename from
paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssigerTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
index 608964ad0..99864b31a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssigerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
@@ -19,7 +19,7 @@
package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.predicate.Predicate;
@@ -44,7 +44,7 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
-class DefaultValueAssigerTest {
+class DefaultValueAssignerTest {
@TempDir java.nio.file.Path tempDir;
private TableSchema tableSchema;
@@ -82,20 +82,20 @@ class DefaultValueAssigerTest {
@Test
public void testGeneralRow() {
- DefaultValueAssiger defaultValueAssiger = new
DefaultValueAssiger(tableSchema);
+ DefaultValueAssigner defaultValueAssigner =
DefaultValueAssigner.create(tableSchema);
int[] projection = tableSchema.projection(Lists.newArrayList("col5",
"col4", "col0"));
Projection top = Projection.of(projection);
int[][] nest = top.toNestedIndexes();
- defaultValueAssiger = defaultValueAssiger.handleProject(nest);
- GenericRow row = defaultValueAssiger.createDefaultValueMapping();
+ defaultValueAssigner = defaultValueAssigner.handleProject(nest);
+ InternalRow row =
defaultValueAssigner.createDefaultValueRow().defaultValueRow();
assertEquals(
"1|0|null",
String.format("%s|%s|%s", row.getString(0), row.getString(1),
row.getString(2)));
}
@Test
- public void testHanldePredicate() {
- DefaultValueAssiger defaultValueAssiger = new
DefaultValueAssiger(tableSchema);
+ public void testHandlePredicate() {
+ DefaultValueAssigner defaultValueAssigner =
DefaultValueAssigner.create(tableSchema);
PredicateBuilder predicateBuilder = new
PredicateBuilder(tableSchema.logicalRowType());
{
@@ -103,7 +103,7 @@ class DefaultValueAssigerTest {
PredicateBuilder.and(
predicateBuilder.equal(predicateBuilder.indexOf("col5"), "100"),
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
- Predicate actual = defaultValueAssiger.handlePredicate(predicate);
+ Predicate actual = defaultValueAssigner.handlePredicate(predicate);
assertEquals(actual,
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
}
@@ -112,18 +112,18 @@ class DefaultValueAssigerTest {
PredicateBuilder.and(
predicateBuilder.equal(predicateBuilder.indexOf("col5"), "100"),
predicateBuilder.equal(predicateBuilder.indexOf("col4"), "1"));
- Predicate actual = defaultValueAssiger.handlePredicate(predicate);
+ Predicate actual = defaultValueAssigner.handlePredicate(predicate);
Assertions.assertThat(actual).isNull();
}
{
- Predicate actual = defaultValueAssiger.handlePredicate(null);
+ Predicate actual = defaultValueAssigner.handlePredicate(null);
Assertions.assertThat(actual).isNull();
}
{
Predicate actual =
- defaultValueAssiger.handlePredicate(
+ defaultValueAssigner.handlePredicate(
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
assertEquals(actual,
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index b908e275a..8d6a61168 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -189,14 +189,19 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
@Test
public void testSortSpillMerge() {
sql(
- "CREATE TABLE IF NOT EXISTS KT (a INT PRIMARY KEY NOT
ENFORCED, b INT) WITH ('sort-spill-threshold'='2')");
- sql("INSERT INTO KT VALUES (1, 1)");
- sql("INSERT INTO KT VALUES (1, 2)");
- sql("INSERT INTO KT VALUES (1, 3)");
- sql("INSERT INTO KT VALUES (1, 4)");
- sql("INSERT INTO KT VALUES (1, 5)");
- sql("INSERT INTO KT VALUES (1, 6)");
- sql("INSERT INTO KT VALUES (1, 7)");
- assertThat(sql("SELECT * FROM
KT")).containsExactlyInAnyOrder(Row.of(1, 7));
+ "CREATE TABLE IF NOT EXISTS KT (a INT PRIMARY KEY NOT
ENFORCED, b STRING) WITH ('sort-spill-threshold'='2')");
+ sql("INSERT INTO KT VALUES (1, '1')");
+ sql("INSERT INTO KT VALUES (1, '2')");
+ sql("INSERT INTO KT VALUES (1, '3')");
+ sql("INSERT INTO KT VALUES (1, '4')");
+ sql("INSERT INTO KT VALUES (1, '5')");
+ sql("INSERT INTO KT VALUES (1, '6')");
+ sql("INSERT INTO KT VALUES (1, '7')");
+
+ // select all
+ assertThat(sql("SELECT * FROM
KT")).containsExactlyInAnyOrder(Row.of(1, "7"));
+
+ // select projection
+ assertThat(sql("SELECT b FROM
KT")).containsExactlyInAnyOrder(Row.of("7"));
}
}