This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 893ce76be [bug] Fix sort-spill-threshold with projection and refactor 
Default values (#1572)
893ce76be is described below

commit 893ce76bee20c6226a47d2631d4cedaf7e514114
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jul 17 13:18:10 2023 +0800

    [bug] Fix sort-spill-threshold with projection and refactor Default values 
(#1572)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |  12 +--
 .../org/apache/paimon/casting/DefaultValueRow.java |   7 +-
 .../paimon/io/KeyValueFileReaderFactory.java       |   4 +
 .../org/apache/paimon/mergetree/MergeSorter.java   |   6 +-
 ...ValueAssiger.java => DefaultValueAssigner.java} | 103 +++++++++++----------
 .../paimon/operation/KeyValueFileStoreRead.java    |   1 +
 .../org/apache/paimon/schema/SchemaValidation.java |   2 +-
 .../paimon/table/AbstractFileStoreTable.java       |  32 ++++---
 .../paimon/table/AppendOnlyFileStoreTable.java     |   7 ++
 .../table/source/InnerStreamTableScanImpl.java     |  11 +--
 .../paimon/table/source/InnerTableScanImpl.java    |  12 +--
 .../org/apache/paimon/table/source/TableRead.java  |   4 +-
 .../table/source/snapshot/SnapshotReaderImpl.java  |  11 +--
 .../apache/paimon/table/system/AuditLogTable.java  |   8 ++
 .../apache/paimon/table/system/BucketsTable.java   |   7 ++
 .../apache/paimon/table/system/ConsumersTable.java |   6 ++
 .../org/apache/paimon/table/system/FilesTable.java |   7 ++
 .../apache/paimon/table/system/ManifestsTable.java |   7 ++
 .../apache/paimon/table/system/OptionsTable.java   |   7 ++
 .../apache/paimon/table/system/SchemasTable.java   |   6 ++
 .../apache/paimon/table/system/SnapshotsTable.java |   7 ++
 .../org/apache/paimon/table/system/TagsTable.java  |   7 ++
 ...igerTest.java => DefaultValueAssignerTest.java} |  22 ++---
 .../apache/paimon/flink/BatchFileStoreITCase.java  |  23 +++--
 24 files changed, 207 insertions(+), 112 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index c979a19c0..4d16f7554 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1101,18 +1101,18 @@ public class CoreOptions implements Serializable {
         return options.get(METASTORE_PARTITIONED_TABLE);
     }
 
-    public Options getFieldDefaultValues() {
-        Map<String, String> defultValues = new HashMap<>();
+    public Map<String, String> getFieldDefaultValues() {
+        Map<String, String> defaultValues = new HashMap<>();
+        String fieldPrefix = FIELDS_PREFIX + ".";
+        String defaultValueSuffix = "." + DEFAULT_VALUE_SUFFIX;
         for (Map.Entry<String, String> option : options.toMap().entrySet()) {
             String key = option.getKey();
-            String fieldPrefix = FIELDS_PREFIX + ".";
-            String defaultValueSuffix = "." + DEFAULT_VALUE_SUFFIX;
             if (key != null && key.startsWith(fieldPrefix) && 
key.endsWith(defaultValueSuffix)) {
                 String fieldName = key.replace(fieldPrefix, 
"").replace(defaultValueSuffix, "");
-                defultValues.put(fieldName, option.getValue());
+                defaultValues.put(fieldName, option.getValue());
             }
         }
-        return new Options(defultValues);
+        return defaultValues;
     }
 
     public List<CommitCallback> commitCallbacks() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/casting/DefaultValueRow.java 
b/paimon-core/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
index 10f2e5cd1..60dae11a7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
+++ b/paimon-core/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
@@ -31,9 +31,10 @@ import org.apache.paimon.types.RowKind;
  * InternalRow}.
  */
 public class DefaultValueRow implements InternalRow {
+
     private InternalRow row;
 
-    private InternalRow defaultValueRow;
+    private final InternalRow defaultValueRow;
 
     private DefaultValueRow(InternalRow defaultValueRow) {
         this.defaultValueRow = defaultValueRow;
@@ -44,6 +45,10 @@ public class DefaultValueRow implements InternalRow {
         return this;
     }
 
+    public InternalRow defaultValueRow() {
+        return defaultValueRow;
+    }
+
     @Override
     public int getFieldCount() {
         return row.getFieldCount();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 75de5709d..e9413715f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -169,6 +169,10 @@ public class KeyValueFileReaderFactory {
             return this;
         }
 
+        public RowType projectedValueType() {
+            return projectedValueType;
+        }
+
         public KeyValueFileReaderFactory build(BinaryRow partition, int 
bucket) {
             return build(partition, bucket, true, Collections.emptyList());
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
index 9f5b34e12..a63c1d21c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java
@@ -67,7 +67,7 @@ import static 
org.apache.paimon.schema.SystemColumns.VALUE_KIND;
 public class MergeSorter {
 
     private final RowType keyType;
-    private final RowType valueType;
+    private RowType valueType;
 
     private final SortEngine sortEngine;
     private final int spillThreshold;
@@ -100,6 +100,10 @@ public class MergeSorter {
         this.ioManager = ioManager;
     }
 
+    public void setProjectedValueType(RowType projectedType) {
+        this.valueType = projectedType;
+    }
+
     public <T> RecordReader<T> mergeSort(
             List<ReaderSupplier<KeyValue>> lazyReaders,
             Comparator<InternalRow> keyComparator,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssiger.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
similarity index 57%
rename from 
paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssiger.java
rename to 
paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
index adaa645fe..730aafe8c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssiger.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.casting.CastExecutor;
 import org.apache.paimon.casting.CastExecutors;
 import org.apache.paimon.casting.DefaultValueRow;
@@ -41,86 +42,82 @@ import java.util.Map;
 import java.util.Optional;
 
 /**
- * the field Default value assigner. note that the invoke of assigning should 
be after merge and
- * schema evolution
+ * The field Default value assigner. note that invoke of assigning should be 
after merge and schema
+ * evolution.
  */
-public class DefaultValueAssiger {
+public class DefaultValueAssigner {
 
-    private GenericRow defaultValueMapping;
-    private TableSchema tableSchema;
+    private final RowType rowType;
+    private final Map<String, String> defaultValues;
 
-    private Map<String, String> defaultValues;
+    private boolean needToAssign;
 
     private int[][] project;
+    private DefaultValueRow defaultValueRow;
 
-    private boolean isCacheDefaultMapping;
-
-    public DefaultValueAssiger(TableSchema tableSchema) {
-        this.tableSchema = tableSchema;
-
-        CoreOptions coreOptions = new CoreOptions(tableSchema.options());
-        defaultValues = coreOptions.getFieldDefaultValues().toMap();
+    private DefaultValueAssigner(Map<String, String> defaultValues, RowType 
rowType) {
+        this.defaultValues = defaultValues;
+        this.needToAssign = defaultValues.size() > 0;
+        this.rowType = rowType;
     }
 
-    public DefaultValueAssiger handleProject(int[][] project) {
+    public DefaultValueAssigner handleProject(int[][] project) {
         this.project = project;
+        if (project != null) {
+            List<String> projected = 
Projection.of(project).project(rowType).getFieldNames();
+            needToAssign = 
defaultValues.keySet().stream().anyMatch(projected::contains);
+        }
         return this;
     }
 
-    /** assign default value for colomn which value is null. */
+    public boolean needToAssign() {
+        return needToAssign;
+    }
+
+    /** assign default value for column which value is null. */
     public RecordReader<InternalRow> 
assignFieldsDefaultValue(RecordReader<InternalRow> reader) {
-        if (defaultValues.isEmpty()) {
+        if (!needToAssign) {
             return reader;
         }
 
-        if (!isCacheDefaultMapping) {
-            isCacheDefaultMapping = true;
-            this.defaultValueMapping = createDefaultValueMapping();
+        if (defaultValueRow == null) {
+            defaultValueRow = createDefaultValueRow();
         }
 
-        RecordReader<InternalRow> result = reader;
-        if (defaultValueMapping != null) {
-            DefaultValueRow defaultValueRow = 
DefaultValueRow.from(defaultValueMapping);
-            result = reader.transform(defaultValueRow::replaceRow);
-        }
-        return result;
+        return reader.transform(defaultValueRow::replaceRow);
     }
 
-    GenericRow createDefaultValueMapping() {
-
-        RowType valueType = tableSchema.logicalRowType();
-
+    @VisibleForTesting
+    DefaultValueRow createDefaultValueRow() {
         List<DataField> fields;
         if (project != null) {
-            fields = Projection.of(project).project(valueType).getFields();
+            fields = Projection.of(project).project(rowType).getFields();
         } else {
-            fields = valueType.getFields();
+            fields = rowType.getFields();
         }
 
-        GenericRow defaultValuesMa = null;
-        if (!fields.isEmpty()) {
-            defaultValuesMa = new GenericRow(fields.size());
-            for (int i = 0; i < fields.size(); i++) {
-                DataField dataField = fields.get(i);
-                String defaultValueStr = defaultValues.get(dataField.name());
-                if (defaultValueStr == null) {
-                    continue;
-                }
+        GenericRow row = new GenericRow(fields.size());
+        for (int i = 0; i < fields.size(); i++) {
+            DataField dataField = fields.get(i);
+            String defaultValueStr = defaultValues.get(dataField.name());
+            if (defaultValueStr == null) {
+                continue;
+            }
 
-                CastExecutor<Object, Object> resolve =
-                        (CastExecutor<Object, Object>)
-                                CastExecutors.resolve(VarCharType.STRING_TYPE, 
dataField.type());
+            @SuppressWarnings("unchecked")
+            CastExecutor<Object, Object> resolve =
+                    (CastExecutor<Object, Object>)
+                            CastExecutors.resolve(VarCharType.STRING_TYPE, 
dataField.type());
 
-                if (resolve == null) {
-                    throw new RuntimeException(
-                            "Default value do not support the type of " + 
dataField.type());
-                }
-                Object defaultValue = 
resolve.cast(BinaryString.fromString(defaultValueStr));
-                defaultValuesMa.setField(i, defaultValue);
+            if (resolve == null) {
+                throw new RuntimeException(
+                        "Default value do not support the type of " + 
dataField.type());
             }
+            Object defaultValue = 
resolve.cast(BinaryString.fromString(defaultValueStr));
+            row.setField(i, defaultValue);
         }
 
-        return defaultValuesMa;
+        return DefaultValueRow.from(row);
     }
 
     public Predicate handlePredicate(Predicate filters) {
@@ -154,4 +151,10 @@ public class DefaultValueAssiger {
         }
         return result;
     }
+
+    public static DefaultValueAssigner create(TableSchema schema) {
+        CoreOptions coreOptions = new CoreOptions(schema.options());
+        Map<String, String> defaultValues = 
coreOptions.getFieldDefaultValues();
+        return new DefaultValueAssigner(defaultValues, 
schema.logicalRowType());
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index 3a666e709..0a2a31dcf 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -127,6 +127,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
         this.outerProjection = projection.outerProjection;
         if (pushdownProjection != null) {
             readerFactoryBuilder.withValueProjection(pushdownProjection);
+            
mergeSorter.setProjectedValueType(readerFactoryBuilder.projectedValueType());
         }
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index b04554a83..df2d2c11a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -297,7 +297,7 @@ public class SchemaValidation {
 
     private static void validateDefaultValues(TableSchema schema) {
         CoreOptions coreOptions = new CoreOptions(schema.options());
-        Map<String, String> defaultValues = 
coreOptions.getFieldDefaultValues().toMap();
+        Map<String, String> defaultValues = 
coreOptions.getFieldDefaultValues();
 
         if (!defaultValues.isEmpty()) {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 5b042b228..3dfeabc8b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -22,11 +22,12 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.metastore.AddPartitionCommitCallback;
 import org.apache.paimon.metastore.MetastoreClient;
-import org.apache.paimon.operation.DefaultValueAssiger;
+import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.Options;
@@ -48,6 +49,7 @@ import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.InnerTableScanImpl;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
 import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
@@ -126,7 +128,7 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 snapshotManager(),
                 splitGenerator(),
                 nonPartitionFilterConsumer(),
-                new DefaultValueAssiger(tableSchema));
+                DefaultValueAssigner.create(tableSchema));
     }
 
     @Override
@@ -135,7 +137,7 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 coreOptions(),
                 newSnapshotReader(),
                 snapshotManager(),
-                new DefaultValueAssiger(tableSchema));
+                DefaultValueAssigner.create(tableSchema));
     }
 
     @Override
@@ -145,7 +147,7 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 newSnapshotReader(),
                 snapshotManager(),
                 supportStreamingReadOverwrite(),
-                new DefaultValueAssiger(tableSchema));
+                DefaultValueAssigner.create(tableSchema));
     }
 
     public abstract SplitGenerator splitGenerator();
@@ -319,27 +321,35 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
     @Override
     public InnerTableRead newRead() {
         InnerTableRead innerTableRead = innerRead();
-        DefaultValueAssiger defaultValueAssiger = new 
DefaultValueAssiger(tableSchema);
+        DefaultValueAssigner defaultValueAssigner = 
DefaultValueAssigner.create(tableSchema);
+        if (!defaultValueAssigner.needToAssign()) {
+            return innerTableRead;
+        }
+
         return new InnerTableRead() {
             @Override
             public InnerTableRead withFilter(Predicate predicate) {
-                
innerTableRead.withFilter(defaultValueAssiger.handlePredicate(predicate));
+                
innerTableRead.withFilter(defaultValueAssigner.handlePredicate(predicate));
                 return this;
             }
 
             @Override
             public InnerTableRead withProjection(int[][] projection) {
-                defaultValueAssiger.handleProject(projection);
+                defaultValueAssigner.handleProject(projection);
                 innerTableRead.withProjection(projection);
                 return this;
             }
 
+            @Override
+            public TableRead withIOManager(IOManager ioManager) {
+                innerTableRead.withIOManager(ioManager);
+                return this;
+            }
+
             @Override
             public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
-                RecordReader<InternalRow> reader =
-                        defaultValueAssiger.assignFieldsDefaultValue(
-                                innerTableRead.createReader(split));
-                return reader;
+                return defaultValueAssigner.assignFieldsDefaultValue(
+                        innerTableRead.createReader(split));
             }
 
             @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 32f4a160a..787dbeff1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -22,6 +22,7 @@ import org.apache.paimon.AppendOnlyFileStore;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.WriteMode;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestCacheFilter;
@@ -40,6 +41,7 @@ import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.Preconditions;
 
@@ -128,6 +130,11 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
                 return this;
             }
 
+            @Override
+            public TableRead withIOManager(IOManager ioManager) {
+                return this;
+            }
+
             @Override
             public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
                 return read.createReader((DataSplit) split);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 9b50e7831..3040ee5b7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -21,7 +21,7 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.Consumer;
-import org.apache.paimon.operation.DefaultValueAssiger;
+import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import 
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
@@ -51,6 +51,7 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
     private final CoreOptions options;
     private final SnapshotManager snapshotManager;
     private final boolean supportStreamingReadOverwrite;
+    private final DefaultValueAssigner defaultValueAssigner;
 
     private StartingScanner startingScanner;
     private FollowUpScanner followUpScanner;
@@ -59,24 +60,22 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
     @Nullable private Long currentWatermark;
     @Nullable private Long nextSnapshotId;
 
-    private DefaultValueAssiger defaultValueAssiger;
-
     public InnerStreamTableScanImpl(
             CoreOptions options,
             SnapshotReader snapshotReader,
             SnapshotManager snapshotManager,
             boolean supportStreamingReadOverwrite,
-            DefaultValueAssiger defaultValueAssiger) {
+            DefaultValueAssigner defaultValueAssigner) {
         super(options, snapshotReader);
         this.options = options;
         this.snapshotManager = snapshotManager;
         this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
-        this.defaultValueAssiger = defaultValueAssiger;
+        this.defaultValueAssigner = defaultValueAssigner;
     }
 
     @Override
     public InnerStreamTableScanImpl withFilter(Predicate predicate) {
-        
snapshotReader.withFilter(defaultValueAssiger.handlePredicate(predicate));
+        
snapshotReader.withFilter(defaultValueAssigner.handlePredicate(predicate));
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index caa8fbaf3..ea19a1ab9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.DefaultValueAssiger;
+import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
@@ -29,27 +29,25 @@ import org.apache.paimon.utils.SnapshotManager;
 public class InnerTableScanImpl extends AbstractInnerTableScan {
 
     private final SnapshotManager snapshotManager;
+    private final DefaultValueAssigner defaultValueAssigner;
 
     private StartingScanner startingScanner;
-
     private boolean hasNext;
 
-    private DefaultValueAssiger defaultValueAssiger;
-
     public InnerTableScanImpl(
             CoreOptions options,
             SnapshotReader snapshotReader,
             SnapshotManager snapshotManager,
-            DefaultValueAssiger defaultValueAssiger) {
+            DefaultValueAssigner defaultValueAssigner) {
         super(options, snapshotReader);
         this.snapshotManager = snapshotManager;
         this.hasNext = true;
-        this.defaultValueAssiger = defaultValueAssiger;
+        this.defaultValueAssigner = defaultValueAssigner;
     }
 
     @Override
     public InnerTableScan withFilter(Predicate predicate) {
-        
snapshotReader.withFilter(defaultValueAssiger.handlePredicate(predicate));
+        
snapshotReader.withFilter(defaultValueAssigner.handlePredicate(predicate));
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
index 8ba6eed3b..adc8dbed9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java
@@ -37,9 +37,7 @@ import java.util.List;
 @Public
 public interface TableRead {
 
-    default TableRead withIOManager(IOManager ioManager) {
-        return this;
-    }
+    TableRead withIOManager(IOManager ioManager);
 
     RecordReader<InternalRow> createReader(Split split) throws IOException;
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 3c17bb51d..848c40175 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -28,7 +28,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.operation.DefaultValueAssiger;
+import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.Predicate;
@@ -67,12 +67,11 @@ public class SnapshotReaderImpl implements SnapshotReader {
     private final ConsumerManager consumerManager;
     private final SplitGenerator splitGenerator;
     private final BiConsumer<FileStoreScan, Predicate> 
nonPartitionFilterConsumer;
+    private final DefaultValueAssigner defaultValueAssigner;
 
     private ScanKind scanKind = ScanKind.ALL;
     private RecordComparator lazyPartitionComparator;
 
-    private DefaultValueAssiger defaultValueAssiger;
-
     public SnapshotReaderImpl(
             FileStoreScan scan,
             TableSchema tableSchema,
@@ -80,7 +79,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
             SnapshotManager snapshotManager,
             SplitGenerator splitGenerator,
             BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer,
-            DefaultValueAssiger defaultValueAssiger) {
+            DefaultValueAssigner defaultValueAssigner) {
         this.scan = scan;
         this.tableSchema = tableSchema;
         this.options = options;
@@ -89,7 +88,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
                 new ConsumerManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
         this.splitGenerator = splitGenerator;
         this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
-        this.defaultValueAssiger = defaultValueAssiger;
+        this.defaultValueAssigner = defaultValueAssigner;
     }
 
     @Override
@@ -130,7 +129,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
         List<Predicate> partitionFilters = new ArrayList<>();
         List<Predicate> nonPartitionFilters = new ArrayList<>();
         for (Predicate p :
-                
PredicateBuilder.splitAnd(defaultValueAssiger.handlePredicate(predicate))) {
+                
PredicateBuilder.splitAnd(defaultValueAssigner.handlePredicate(predicate))) {
             Optional<Predicate> mapped = transformFieldMapping(p, 
fieldIdxToPartitionIdx);
             if (mapped.isPresent()) {
                 partitionFilters.add(mapped.get());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 5750bb822..cc1f98f4b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -24,6 +24,7 @@ import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.ScanKind;
@@ -41,6 +42,7 @@ import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowKind;
@@ -382,6 +384,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            this.dataRead.withIOManager(ioManager);
+            return this;
+        }
+
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
             return dataRead.createReader(split).transform(this::convertRow);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index c8aea53f5..971b97d3c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.system;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
@@ -35,6 +36,7 @@ import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
@@ -154,6 +156,11 @@ public class BucketsTable implements DataTable, 
ReadonlyTable {
             throw new UnsupportedOperationException("BucketsRead does not 
support projection");
         }
 
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            return this;
+        }
+
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
             if (!(split instanceof DataSplit)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
index 570dc4683..896acfccc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ConsumersTable.java
@@ -22,6 +22,7 @@ import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.predicate.Predicate;
@@ -175,6 +176,11 @@ public class ConsumersTable implements ReadonlyTable {
             return this;
         }
 
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            return this;
+        }
+
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
             if (!(split instanceof ConsumersTable.ConsumersSplit)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 94858164f..c00900734 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.system;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.LazyGenericRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.format.FieldStats;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
@@ -39,6 +40,7 @@ import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
@@ -217,6 +219,11 @@ public class FilesTable implements ReadonlyTable {
             return this;
         }
 
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            return this;
+        }
+
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
             if (!(split instanceof FilesSplit)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
index bad1a374a..9357cd64f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java
@@ -23,6 +23,7 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -36,6 +37,7 @@ import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
@@ -188,6 +190,11 @@ public class ManifestsTable implements ReadonlyTable {
             return this;
         }
 
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            return this;
+        }
+
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
             if (!(split instanceof ManifestsSplit)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
index a104a79c8..81dfa094c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.system;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.predicate.Predicate;
@@ -32,6 +33,7 @@ import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.IteratorRecordReader;
@@ -169,6 +171,11 @@ public class OptionsTable implements ReadonlyTable {
             return this;
         }
 
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            return this;
+        }
+
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
             if (!(split instanceof OptionsSplit)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index 91d553755..eed7d4d7b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.system;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.predicate.Predicate;
@@ -181,6 +182,11 @@ public class SchemasTable implements ReadonlyTable {
             return this;
         }
 
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            return this;
+        }
+
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
             if (!(split instanceof SchemasSplit)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index b4333f221..0a73bbebd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.predicate.Predicate;
@@ -33,6 +34,7 @@ import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
@@ -192,6 +194,11 @@ public class SnapshotsTable implements ReadonlyTable {
             return this;
         }
 
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            return this;
+        }
+
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
             if (!(split instanceof SnapshotsSplit)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
index a7b6ba447..eecea7d0e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.predicate.Predicate;
@@ -33,6 +34,7 @@ import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadOnceTableScan;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
@@ -178,6 +180,11 @@ public class TagsTable implements ReadonlyTable {
             return this;
         }
 
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            return this;
+        }
+
         @Override
         public RecordReader<InternalRow> createReader(Split split) {
             if (!(split instanceof TagsSplit)) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssigerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
similarity index 86%
rename from 
paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssigerTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
index 608964ad0..99864b31a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssigerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.predicate.Predicate;
@@ -44,7 +44,7 @@ import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-class DefaultValueAssigerTest {
+class DefaultValueAssignerTest {
     @TempDir java.nio.file.Path tempDir;
 
     private TableSchema tableSchema;
@@ -82,20 +82,20 @@ class DefaultValueAssigerTest {
 
     @Test
     public void testGeneralRow() {
-        DefaultValueAssiger defaultValueAssiger = new 
DefaultValueAssiger(tableSchema);
+        DefaultValueAssigner defaultValueAssigner = 
DefaultValueAssigner.create(tableSchema);
         int[] projection = tableSchema.projection(Lists.newArrayList("col5", 
"col4", "col0"));
         Projection top = Projection.of(projection);
         int[][] nest = top.toNestedIndexes();
-        defaultValueAssiger = defaultValueAssiger.handleProject(nest);
-        GenericRow row = defaultValueAssiger.createDefaultValueMapping();
+        defaultValueAssigner = defaultValueAssigner.handleProject(nest);
+        InternalRow row = 
defaultValueAssigner.createDefaultValueRow().defaultValueRow();
         assertEquals(
                 "1|0|null",
                 String.format("%s|%s|%s", row.getString(0), row.getString(1), 
row.getString(2)));
     }
 
     @Test
-    public void testHanldePredicate() {
-        DefaultValueAssiger defaultValueAssiger = new 
DefaultValueAssiger(tableSchema);
+    public void testHandlePredicate() {
+        DefaultValueAssigner defaultValueAssigner = 
DefaultValueAssigner.create(tableSchema);
         PredicateBuilder predicateBuilder = new 
PredicateBuilder(tableSchema.logicalRowType());
 
         {
@@ -103,7 +103,7 @@ class DefaultValueAssigerTest {
                     PredicateBuilder.and(
                             
predicateBuilder.equal(predicateBuilder.indexOf("col5"), "100"),
                             
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
-            Predicate actual = defaultValueAssiger.handlePredicate(predicate);
+            Predicate actual = defaultValueAssigner.handlePredicate(predicate);
             assertEquals(actual, 
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
         }
 
@@ -112,18 +112,18 @@ class DefaultValueAssigerTest {
                     PredicateBuilder.and(
                             
predicateBuilder.equal(predicateBuilder.indexOf("col5"), "100"),
                             
predicateBuilder.equal(predicateBuilder.indexOf("col4"), "1"));
-            Predicate actual = defaultValueAssiger.handlePredicate(predicate);
+            Predicate actual = defaultValueAssigner.handlePredicate(predicate);
             Assertions.assertThat(actual).isNull();
         }
 
         {
-            Predicate actual = defaultValueAssiger.handlePredicate(null);
+            Predicate actual = defaultValueAssigner.handlePredicate(null);
             Assertions.assertThat(actual).isNull();
         }
 
         {
             Predicate actual =
-                    defaultValueAssiger.handlePredicate(
+                    defaultValueAssigner.handlePredicate(
                             
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
             assertEquals(actual, 
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index b908e275a..8d6a61168 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -189,14 +189,19 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
     @Test
     public void testSortSpillMerge() {
         sql(
-                "CREATE TABLE IF NOT EXISTS KT (a INT PRIMARY KEY NOT 
ENFORCED, b INT) WITH ('sort-spill-threshold'='2')");
-        sql("INSERT INTO KT VALUES (1, 1)");
-        sql("INSERT INTO KT VALUES (1, 2)");
-        sql("INSERT INTO KT VALUES (1, 3)");
-        sql("INSERT INTO KT VALUES (1, 4)");
-        sql("INSERT INTO KT VALUES (1, 5)");
-        sql("INSERT INTO KT VALUES (1, 6)");
-        sql("INSERT INTO KT VALUES (1, 7)");
-        assertThat(sql("SELECT * FROM 
KT")).containsExactlyInAnyOrder(Row.of(1, 7));
+                "CREATE TABLE IF NOT EXISTS KT (a INT PRIMARY KEY NOT 
ENFORCED, b STRING) WITH ('sort-spill-threshold'='2')");
+        sql("INSERT INTO KT VALUES (1, '1')");
+        sql("INSERT INTO KT VALUES (1, '2')");
+        sql("INSERT INTO KT VALUES (1, '3')");
+        sql("INSERT INTO KT VALUES (1, '4')");
+        sql("INSERT INTO KT VALUES (1, '5')");
+        sql("INSERT INTO KT VALUES (1, '6')");
+        sql("INSERT INTO KT VALUES (1, '7')");
+
+        // select all
+        assertThat(sql("SELECT * FROM 
KT")).containsExactlyInAnyOrder(Row.of(1, "7"));
+
+        // select projection
+        assertThat(sql("SELECT b FROM 
KT")).containsExactlyInAnyOrder(Row.of("7"));
     }
 }


Reply via email to