This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6b0ddd7f1b [core] Remove old default value for read side (#5804)
6b0ddd7f1b is described below
commit 6b0ddd7f1b828d6fb564c609e15520209cb9e997
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 27 15:10:23 2025 +0800
[core] Remove old default value for read side (#5804)
---
.../paimon/operation/DefaultValueAssigner.java | 175 ---------------------
.../privilege/FileBasedPrivilegeManager.java | 1 +
.../paimon/table/AbstractFileStoreTable.java | 2 -
.../paimon/table/source/AbstractDataTableRead.java | 11 --
.../paimon/table/source/DataTableBatchScan.java | 6 +-
.../paimon/table/source/DataTableStreamScan.java | 5 +-
.../table/source/snapshot/SnapshotReaderImpl.java | 7 +-
.../paimon/operation/DefaultValueAssignerTest.java | 135 ----------------
.../paimon/table/PrimaryKeySimpleTableTest.java | 51 ------
.../source/snapshot/DefaultValueScannerTest.java | 92 -----------
.../apache/paimon/flink/ReadWriteTableITCase.java | 80 ----------
11 files changed, 4 insertions(+), 561 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
deleted file mode 100644
index 416237b60b..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation;
-
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.casting.CastExecutor;
-import org.apache.paimon.casting.CastExecutors;
-import org.apache.paimon.casting.DefaultValueRow;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.predicate.PredicateReplaceVisitor;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.VarCharType;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
-
-/**
- * The field Default value assigner. note that invoke of assigning should be
after merge and schema
- * evolution.
- *
- * @deprecated default value in reading is not recommended
- */
-@Deprecated
-public class DefaultValueAssigner {
-
- public static final String DEFAULT_VALUE_SUFFIX = "default-value";
-
- private final RowType rowType;
- private final Map<String, String> defaultValues;
-
- private boolean needToAssign;
-
- private @Nullable RowType readRowType;
- private DefaultValueRow defaultValueRow;
-
- private DefaultValueAssigner(Map<String, String> defaultValues, RowType
rowType) {
- this.defaultValues = defaultValues;
- this.needToAssign = !defaultValues.isEmpty();
- this.rowType = rowType;
- }
-
- public DefaultValueAssigner handleReadRowType(RowType readRowType) {
- this.readRowType = readRowType;
- List<String> requiredFieldNames = readRowType.getFieldNames();
- needToAssign =
defaultValues.keySet().stream().anyMatch(requiredFieldNames::contains);
- return this;
- }
-
- /** assign default value for column which value is null. */
- public RecordReader<InternalRow>
assignFieldsDefaultValue(RecordReader<InternalRow> reader) {
- if (!needToAssign) {
- return reader;
- }
-
- if (defaultValueRow == null) {
- defaultValueRow = createDefaultValueRow();
- }
-
- return reader.transform(defaultValueRow::replaceRow);
- }
-
- @VisibleForTesting
- DefaultValueRow createDefaultValueRow() {
- List<DataField> fields;
- if (readRowType != null) {
- fields = readRowType.getFields();
- } else {
- fields = rowType.getFields();
- }
-
- GenericRow row = new GenericRow(fields.size());
- for (int i = 0; i < fields.size(); i++) {
- DataField dataField = fields.get(i);
- String defaultValueStr = defaultValues.get(dataField.name());
- if (defaultValueStr == null) {
- continue;
- }
-
- @SuppressWarnings("unchecked")
- CastExecutor<Object, Object> resolve =
- (CastExecutor<Object, Object>)
- CastExecutors.resolve(VarCharType.STRING_TYPE,
dataField.type());
-
- if (resolve == null) {
- throw new RuntimeException(
- "Default value do not support the type of " +
dataField.type());
- }
- Object defaultValue =
resolve.cast(BinaryString.fromString(defaultValueStr));
- row.setField(i, defaultValue);
- }
-
- return DefaultValueRow.from(row);
- }
-
- public Predicate handlePredicate(Predicate filters) {
- Predicate result = filters;
- if (!defaultValues.isEmpty()) {
- if (filters != null) {
- // TODO improve predicate tree with replacing always true and
always false
- PredicateReplaceVisitor deletePredicateWithFieldNameVisitor =
- predicate -> {
- if
(defaultValues.containsKey(predicate.fieldName())) {
- return Optional.empty();
- }
- return Optional.of(predicate);
- };
-
- ArrayList<Predicate> filterWithoutDefaultValueField = new
ArrayList<>();
-
- List<Predicate> predicates =
PredicateBuilder.splitAnd(filters);
- for (Predicate predicate : predicates) {
- predicate
- .visit(deletePredicateWithFieldNameVisitor)
- .ifPresent(filterWithoutDefaultValueField::add);
- }
-
- if (!filterWithoutDefaultValueField.isEmpty()) {
- result =
PredicateBuilder.and(filterWithoutDefaultValueField);
- } else {
- result = null;
- }
- }
- }
- return result;
- }
-
- public static DefaultValueAssigner create(TableSchema schema) {
- Map<String, String> defaultValues =
getFieldDefaultValues(schema.options());
- return new DefaultValueAssigner(defaultValues,
schema.logicalRowType());
- }
-
- private static Map<String, String> getFieldDefaultValues(Map<String,
String> options) {
- Map<String, String> defaultValues = new HashMap<>();
- String fieldPrefix = FIELDS_PREFIX + ".";
- String defaultValueSuffix = "." + DEFAULT_VALUE_SUFFIX;
- for (Map.Entry<String, String> option : options.entrySet()) {
- String key = option.getKey();
- if (key != null && key.startsWith(fieldPrefix) &&
key.endsWith(defaultValueSuffix)) {
- String fieldName = key.replace(fieldPrefix,
"").replace(defaultValueSuffix, "");
- defaultValues.put(fieldName, option.getValue());
- }
- }
- return defaultValues;
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/FileBasedPrivilegeManager.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/FileBasedPrivilegeManager.java
index ec86e0cd17..3f4d89466e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/FileBasedPrivilegeManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/FileBasedPrivilegeManager.java
@@ -384,6 +384,7 @@ public class FileBasedPrivilegeManager implements
PrivilegeManager {
private void createUserTable() {
Options options = new Options();
options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.FILE_FORMAT, "avro");
Path tableRoot = new Path(warehouse, USER_TABLE_DIR);
SchemaManager schemaManager = new SchemaManager(fileIO, tableRoot);
try {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index daa00f295a..7ea19d9f02 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -27,7 +27,6 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
-import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.options.Options;
@@ -259,7 +258,6 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
changelogManager(),
splitGenerator(),
nonPartitionFilterConsumer(),
- DefaultValueAssigner.create(tableSchema),
store().pathFactory(),
name(),
store().newIndexFileHandler());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index da2ce8cb56..fe73d3c251 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateProjectionConverter;
import org.apache.paimon.reader.RecordReader;
@@ -33,8 +32,6 @@ import java.util.Optional;
/** A {@link InnerTableRead} for data table. */
public abstract class AbstractDataTableRead implements InnerTableRead {
- private final DefaultValueAssigner defaultValueAssigner;
-
private RowType readType;
private boolean executeFilter = false;
private Predicate predicate;
@@ -42,7 +39,6 @@ public abstract class AbstractDataTableRead implements
InnerTableRead {
public AbstractDataTableRead(TableSchema schema) {
this.schema = schema;
- this.defaultValueAssigner = schema == null ? null :
DefaultValueAssigner.create(schema);
}
public abstract void applyReadType(RowType readType);
@@ -57,9 +53,6 @@ public abstract class AbstractDataTableRead implements
InnerTableRead {
@Override
public final InnerTableRead withFilter(Predicate predicate) {
this.predicate = predicate;
- if (defaultValueAssigner != null) {
- predicate = defaultValueAssigner.handlePredicate(predicate);
- }
return innerWithFilter(predicate);
}
@@ -82,7 +75,6 @@ public abstract class AbstractDataTableRead implements
InnerTableRead {
@Override
public final InnerTableRead withReadType(RowType readType) {
this.readType = readType;
- this.defaultValueAssigner.handleReadRowType(readType);
applyReadType(readType);
return this;
}
@@ -90,9 +82,6 @@ public abstract class AbstractDataTableRead implements
InnerTableRead {
@Override
public final RecordReader<InternalRow> createReader(Split split) throws
IOException {
RecordReader<InternalRow> reader = reader(split);
- if (defaultValueAssigner != null) {
- reader = defaultValueAssigner.assignFieldsDefaultValue(reader);
- }
if (executeFilter) {
reader = executeFilter(reader);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index a1b94f59a5..0bf6927e4e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
@@ -34,8 +33,6 @@ import java.util.List;
/** {@link TableScan} implementation for batch planning. */
public class DataTableBatchScan extends AbstractDataTableScan {
- private final DefaultValueAssigner defaultValueAssigner;
-
private StartingScanner startingScanner;
private boolean hasNext;
@@ -49,7 +46,6 @@ public class DataTableBatchScan extends AbstractDataTableScan
{
super(schema, options, snapshotReader, queryAuth);
this.hasNext = true;
- this.defaultValueAssigner = DefaultValueAssigner.create(schema);
if (!schema.primaryKeys().isEmpty() && options.batchScanSkipLevel0()) {
snapshotReader.withLevelFilter(level -> level >
0).enableValueFilter();
@@ -62,7 +58,7 @@ public class DataTableBatchScan extends AbstractDataTableScan
{
@Override
public InnerTableScan withFilter(Predicate predicate) {
super.withFilter(predicate);
-
snapshotReader.withFilter(defaultValueAssigner.handlePredicate(predicate));
+ snapshotReader.withFilter(predicate);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 2f42674296..a6ab782dba 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -24,7 +24,6 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
@@ -62,7 +61,6 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
private final StreamScanMode scanMode;
private final SnapshotManager snapshotManager;
private final boolean supportStreamingReadOverwrite;
- private final DefaultValueAssigner defaultValueAssigner;
private final NextSnapshotFetcher nextSnapshotProvider;
private final boolean hasPk;
@@ -92,7 +90,6 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
this.scanMode =
options.toConfiguration().get(CoreOptions.STREAM_SCAN_MODE);
this.snapshotManager = snapshotManager;
this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
- this.defaultValueAssigner = DefaultValueAssigner.create(schema);
this.nextSnapshotProvider =
new NextSnapshotFetcher(
snapshotManager, changelogManager,
options.changelogLifecycleDecoupled());
@@ -107,7 +104,7 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
@Override
public DataTableStreamScan withFilter(Predicate predicate) {
super.withFilter(predicate);
-
snapshotReader.withFilter(defaultValueAssigner.handlePredicate(predicate));
+ snapshotReader.withFilter(predicate);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index d6658be267..5855e88ab1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -34,7 +34,6 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
-import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.operation.metrics.ScanMetrics;
@@ -85,7 +84,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
private final ConsumerManager consumerManager;
private final SplitGenerator splitGenerator;
private final BiConsumer<FileStoreScan, Predicate>
nonPartitionFilterConsumer;
- private final DefaultValueAssigner defaultValueAssigner;
private final FileStorePathFactory pathFactory;
private final String tableName;
private final IndexFileHandler indexFileHandler;
@@ -101,7 +99,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
ChangelogManager changelogManager,
SplitGenerator splitGenerator,
BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer,
- DefaultValueAssigner defaultValueAssigner,
FileStorePathFactory pathFactory,
String tableName,
IndexFileHandler indexFileHandler) {
@@ -118,7 +115,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
snapshotManager.branch());
this.splitGenerator = splitGenerator;
this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
- this.defaultValueAssigner = defaultValueAssigner;
this.pathFactory = pathFactory;
this.tableName = tableName;
@@ -218,8 +214,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
List<Predicate> partitionFilters = new ArrayList<>();
List<Predicate> nonPartitionFilters = new ArrayList<>();
- for (Predicate p :
-
PredicateBuilder.splitAnd(defaultValueAssigner.handlePredicate(predicate))) {
+ for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
Optional<Predicate> mapped = transformFieldMapping(p,
fieldIdxToPartitionIdx);
if (mapped.isPresent()) {
partitionFilters.add(mapped.get());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
deleted file mode 100644
index 7cb31fba27..0000000000
---
a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
-
-import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-class DefaultValueAssignerTest {
- @TempDir java.nio.file.Path tempDir;
-
- private TableSchema tableSchema;
-
- @BeforeEach
- public void beforeEach() throws Exception {
- Path tablePath = new Path(tempDir.toUri());
- SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
tablePath);
- Map<String, String> options = new HashMap<>();
- options.put(
- String.format(
- "%s.%s.%s",
- CoreOptions.FIELDS_PREFIX,
- "col4",
- DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
- "0");
- options.put(
- String.format(
- "%s.%s.%s",
- CoreOptions.FIELDS_PREFIX,
- "col5",
- DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
- "1");
- Schema schema =
- new Schema(
- Lists.newArrayList(
- new DataField(0, "col0", DataTypes.STRING()),
- new DataField(1, "col1", DataTypes.STRING()),
- new DataField(2, "col2", DataTypes.STRING()),
- new DataField(3, "col3", DataTypes.STRING()),
- new DataField(4, "col4", DataTypes.STRING()),
- new DataField(5, "col5", DataTypes.STRING())),
- Lists.newArrayList("col0"),
- Collections.emptyList(),
- options,
- "");
- tableSchema = schemaManager.createTable(schema);
- }
-
- @Test
- public void testGeneralRow() {
- DefaultValueAssigner defaultValueAssigner =
DefaultValueAssigner.create(tableSchema);
- RowType readRowType =
- tableSchema.projectedLogicalRowType(Lists.newArrayList("col5",
"col4", "col0"));
- defaultValueAssigner =
defaultValueAssigner.handleReadRowType(readRowType);
- InternalRow row =
defaultValueAssigner.createDefaultValueRow().defaultValueRow();
- assertThat(String.format("%s|%s|%s", row.getString(0),
row.getString(1), row.getString(2)))
- .isEqualTo("1|0|null");
- }
-
- @Test
- public void testHandlePredicate() {
- DefaultValueAssigner defaultValueAssigner =
DefaultValueAssigner.create(tableSchema);
- PredicateBuilder predicateBuilder = new
PredicateBuilder(tableSchema.logicalRowType());
-
- {
- Predicate predicate =
- PredicateBuilder.and(
-
predicateBuilder.equal(predicateBuilder.indexOf("col5"), "100"),
-
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
- Predicate actual = defaultValueAssigner.handlePredicate(predicate);
- assertThat(actual)
-
.isEqualTo(predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
- }
-
- {
- Predicate predicate =
- PredicateBuilder.and(
-
predicateBuilder.equal(predicateBuilder.indexOf("col5"), "100"),
-
predicateBuilder.equal(predicateBuilder.indexOf("col4"), "1"));
- Predicate actual = defaultValueAssigner.handlePredicate(predicate);
- Assertions.assertThat(actual).isNull();
- }
-
- {
- Predicate actual = defaultValueAssigner.handlePredicate(null);
- Assertions.assertThat(actual).isNull();
- }
-
- {
- Predicate actual =
- defaultValueAssigner.handlePredicate(
-
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
- assertThat(actual)
-
.isEqualTo(predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
- }
- }
-}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 693c378a21..73208f290a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -36,7 +36,6 @@ import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.operation.AbstractFileStoreWrite;
-import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
@@ -1281,56 +1280,6 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
commit.close();
}
- @Test
- public void testAuditLogWithDefaultValueFields() throws Exception {
- FileStoreTable table =
- createFileStoreTable(
- conf -> {
- conf.set(CHANGELOG_PRODUCER,
ChangelogProducer.INPUT);
- conf.set(
- String.format(
- "%s.%s.%s",
- CoreOptions.FIELDS_PREFIX,
- "b",
-
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
- "0");
- });
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
-
- write.write(rowDataWithKind(RowKind.INSERT, 2, 20, 200L));
- write.write(rowDataWithKind(RowKind.INSERT, 2, 21, null));
- commit.commit(0, write.prepareCommit(true, 0));
-
- write.close();
- commit.close();
-
- AuditLogTable auditLogTable = new AuditLogTable(table);
- Function<InternalRow, String> rowDataToString =
- row ->
- internalRowToString(
- row,
- DataTypes.ROW(
- DataTypes.STRING(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.BIGINT()));
-
- PredicateBuilder predicateBuilder = new
PredicateBuilder(auditLogTable.rowType());
-
- SnapshotReader snapshotReader =
- auditLogTable
- .newSnapshotReader()
- .withFilter(
- and(
-
predicateBuilder.equal(predicateBuilder.indexOf("b"), 300),
-
predicateBuilder.equal(predicateBuilder.indexOf("pt"), 2)));
- InnerTableRead read = auditLogTable.newRead();
- List<String> result =
- getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowDataToString);
- assertThat(result).containsExactlyInAnyOrder("+I[+I, 2, 20, 200]",
"+I[+I, 2, 21, 0]");
- }
-
@Test
public void testAuditLog() throws Exception {
FileStoreTable table =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
deleted file mode 100644
index ec253143a7..0000000000
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.snapshot;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.DefaultValueAssigner;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.StreamTableCommit;
-import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.source.StreamDataTableScan;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.table.source.TableScan;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** test default value on streaming scan. */
-public class DefaultValueScannerTest extends ScannerTestBase {
- @Test
- public void testDefaultValue() throws Exception {
- TableRead read = table.newRead();
- StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser);
- StreamDataTableScan scan = table.newStreamScan();
-
- write.write(rowData(1, 10, 101L));
- commit.commit(0, write.prepareCommit(true, 0));
-
- write.write(rowData(1, 10, null));
- commit.commit(1, write.prepareCommit(true, 1));
-
- {
- TableScan.Plan plan = scan.plan();
- assertThat(getResult(read, plan.splits()))
- .hasSameElementsAs(Arrays.asList("+I 1|10|100"));
- }
-
- write.write(rowData(2, 11, 200L));
- write.write(rowData(2, 12, null));
- commit.commit(1, write.prepareCommit(true, 1));
-
- {
- // Predicate pushdown for default fields will not work
- PredicateBuilder predicateBuilder =
- new PredicateBuilder(table.schema().logicalRowType());
-
- Predicate ptEqual =
predicateBuilder.equal(predicateBuilder.indexOf("pt"), 2);
- Predicate bEqual =
predicateBuilder.equal(predicateBuilder.indexOf("b"), 200);
- Predicate predicate = PredicateBuilder.and(ptEqual, bEqual);
-
- TableScan.Plan plan = scan.withFilter(predicate).plan();
- read = table.newRead().withFilter(predicate);
- List<String> result = getResult(read, plan.splits());
- assertThat(result).hasSameElementsAs(Arrays.asList("+I 2|11|200",
"+I 2|12|100"));
- }
- write.close();
- commit.close();
- }
-
- protected FileStoreTable createFileStoreTable() throws Exception {
- Options options = new Options();
- options.set(
- String.format(
- "%s.%s.%s",
- CoreOptions.FIELDS_PREFIX, "b",
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
- "100");
- return createFileStoreTable(options);
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 676836d15c..f5e6a797ca 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -23,15 +23,12 @@ import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;
-import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
-
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -58,7 +55,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
@@ -75,10 +71,7 @@ import java.util.stream.Stream;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static org.apache.paimon.CoreOptions.BUCKET;
-import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
-import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
-import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
import static
org.apache.paimon.flink.FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM;
@@ -1586,79 +1579,6 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
changelogRow("+I", 3L, "Euro", 119L, "2022-01-02")));
}
- @Test
- public void testDefaultValueWithoutPrimaryKey() throws Exception {
- Map<String, String> options = new HashMap<>();
- options.put(
- CoreOptions.FIELDS_PREFIX + ".rate." +
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX,
- "1000");
-
- String table =
- createTable(
- Arrays.asList(
- "id BIGINT NOT NULL",
- "currency STRING",
- "rate BIGINT",
- "dt String"),
- Collections.emptyList(),
- Collections.singletonList("id"),
- Collections.emptyList(),
- options);
- insertInto(
- table,
- "(1, 'US Dollar', 114, '2022-01-01')",
- "(2, 'Yen', cast(null as int), '2022-01-01')",
- "(3, 'Euro', cast(null as int), '2022-01-01')",
- "(3, 'Euro', 119, '2022-01-02')");
-
- List<Row> expectedRecords =
- Arrays.asList(
- // part = 2022-01-01
- changelogRow("+I", 2L, "Yen", 1000L, "2022-01-01"),
- changelogRow("+I", 3L, "Euro", 1000L, "2022-01-01"));
-
- String querySql = String.format("SELECT * FROM %s where rate = 1000",
table);
- testBatchRead(querySql, expectedRecords);
- }
-
- @ParameterizedTest
- @EnumSource(CoreOptions.MergeEngine.class)
- public void testDefaultValueWithPrimaryKey(CoreOptions.MergeEngine
mergeEngine)
- throws Exception {
- Map<String, String> options = new HashMap<>();
- options.put(
- CoreOptions.FIELDS_PREFIX + ".rate." +
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX,
- "1000");
- options.put(MERGE_ENGINE.key(), mergeEngine.toString());
- if (mergeEngine == FIRST_ROW) {
- options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
- }
- String table =
- createTable(
- Arrays.asList(
- "id BIGINT NOT NULL",
- "currency STRING",
- "rate BIGINT",
- "dt String"),
- Lists.newArrayList("id", "dt"),
- Collections.emptyList(),
- Lists.newArrayList("dt"),
- options);
- insertInto(
- table,
- "(1, 'US Dollar', 114, '2022-01-01')",
- "(2, 'Yen', cast(null as int), '2022-01-01')",
- "(2, 'Yen', cast(null as int), '2022-01-01')",
- "(3, 'Euro', cast(null as int) , '2022-01-02')");
-
- List<Row> expectedRecords =
- Arrays.asList(changelogRow("+I", 3L, "Euro", 1000L,
"2022-01-02"));
-
- String querySql =
- String.format("SELECT * FROM %s where rate = 1000 and currency
='Euro'", table);
- testBatchRead(querySql, expectedRecords);
- }
-
@Test
public void testUpdateWithoutPrimaryKey() throws Exception {
// Step1: define table schema