This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8d28a03022 [core] Introduce specific options for changelog files
(#5451)
8d28a03022 is described below
commit 8d28a0302234cc1bf7a15c882b924f9d58b2eeea
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 11 18:47:25 2025 +0800
[core] Introduce specific options for changelog files (#5451)
---
.../shortcodes/generated/core_configuration.html | 18 ++
.../main/java/org/apache/paimon/CoreOptions.java | 35 ++++
.../org/apache/paimon/io/FileWriterContext.java | 48 +++++
.../apache/paimon/io/KeyValueDataFileWriter.java | 15 +-
.../paimon/io/KeyValueDataFileWriterImpl.java | 9 +-
.../paimon/io/KeyValueFileWriterFactory.java | 210 +++++++++++++--------
.../paimon/io/KeyValueThinDataFileWriterImpl.java | 11 +-
.../org/apache/paimon/io/RowDataFileWriter.java | 15 +-
.../apache/paimon/io/RowDataRollingFileWriter.java | 18 +-
.../paimon/io/StatsCollectingSingleFileWriter.java | 9 +-
.../apache/paimon/utils/FileStorePathFactory.java | 15 +-
.../paimon/io/KeyValueFileReadWriteTest.java | 35 ++--
.../apache/paimon/io/RollingFileWriterTest.java | 17 +-
.../mergetree/ChangelogMergeTreeRewriterTest.java | 2 +-
.../paimon/mergetree/ContainsLevelsTest.java | 4 +-
.../apache/paimon/mergetree/LookupLevelsTest.java | 4 +-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 4 +-
.../paimon/table/PrimaryKeySimpleTableTest.java | 47 ++++-
18 files changed, 337 insertions(+), 179 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 92d5894af3..05b6c60bc1 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -68,12 +68,30 @@ under the License.
<td>MemorySize</td>
<td>Memory page size for caching.</td>
</tr>
+ <tr>
+ <td><h5>changelog-file.compression</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Changelog file compression.</td>
+ </tr>
+ <tr>
+ <td><h5>changelog-file.format</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify the message format of changelog files, currently
parquet, avro and orc are supported.</td>
+ </tr>
<tr>
<td><h5>changelog-file.prefix</h5></td>
<td style="word-wrap: break-word;">"changelog-"</td>
<td>String</td>
<td>Specify the file name prefix of changelog files.</td>
</tr>
+ <tr>
+ <td><h5>changelog-file.stats-mode</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Changelog file metadata stats collection. none, counts,
truncate(16), full is available.</td>
+ </tr>
<tr>
<td><h5>changelog-producer</h5></td>
<td style="word-wrap: break-word;">none</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index e59b30a560..7bc6148f9f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -226,6 +226,26 @@ public class CoreOptions implements Serializable {
.defaultValue("changelog-")
.withDescription("Specify the file name prefix of
changelog files.");
+ public static final ConfigOption<String> CHANGELOG_FILE_FORMAT =
+ key("changelog-file.format")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Specify the message format of changelog files,
currently parquet, avro and orc are supported.");
+
+ public static final ConfigOption<String> CHANGELOG_FILE_COMPRESSION =
+ key("changelog-file.compression")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Changelog file compression.");
+
+ public static final ConfigOption<String> CHANGELOG_FILE_STATS_MODE =
+ key("changelog-file.stats-mode")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Changelog file metadata stats collection. none,
counts, truncate(16), full is available.");
+
public static final ConfigOption<Boolean> FILE_SUFFIX_INCLUDE_COMPRESSION =
key("file.suffix.include.compression")
.booleanType()
@@ -1846,6 +1866,21 @@ public class CoreOptions implements Serializable {
return options.get(CHANGELOG_FILE_PREFIX);
}
+ @Nullable
+ public String changelogFileFormat() {
+ return options.get(CHANGELOG_FILE_FORMAT);
+ }
+
+ @Nullable
+ public String changelogFileCompression() {
+ return options.get(CHANGELOG_FILE_COMPRESSION);
+ }
+
+ @Nullable
+ public String changelogFileStatsMode() {
+ return options.get(CHANGELOG_FILE_STATS_MODE);
+ }
+
public boolean fileSuffixIncludeCompression() {
return options.get(FILE_SUFFIX_INCLUDE_COMPRESSION);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileWriterContext.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileWriterContext.java
new file mode 100644
index 0000000000..58d7758b17
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileWriterContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.format.FormatWriterFactory;
+
+/** Context of file writer. */
+public class FileWriterContext {
+
+ private final FormatWriterFactory factory;
+ private final SimpleStatsProducer statsProducer;
+ private final String compression;
+
+ public FileWriterContext(
+ FormatWriterFactory factory, SimpleStatsProducer statsProducer,
String compression) {
+ this.factory = factory;
+ this.statsProducer = statsProducer;
+ this.compression = compression;
+ }
+
+ public FormatWriterFactory factory() {
+ return factory;
+ }
+
+ public SimpleStatsProducer statsProducer() {
+ return statsProducer;
+ }
+
+ public String compression() {
+ return compression;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index 0f5b4e9985..a8ea12dabd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -24,7 +24,6 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fileindex.FileIndexOptions;
-import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleColStats;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -78,29 +77,19 @@ public abstract class KeyValueDataFileWriter
public KeyValueDataFileWriter(
FileIO fileIO,
- FormatWriterFactory factory,
+ FileWriterContext context,
Path path,
Function<KeyValue, InternalRow> converter,
RowType keyType,
RowType valueType,
RowType writeRowType,
- SimpleStatsProducer statsProducer,
long schemaId,
int level,
- String compression,
CoreOptions options,
FileSource fileSource,
FileIndexOptions fileIndexOptions,
boolean isExternalPath) {
- super(
- fileIO,
- factory,
- path,
- converter,
- writeRowType,
- statsProducer,
- compression,
- options.asyncFileWrite());
+ super(fileIO, context, path, converter, writeRowType,
options.asyncFileWrite());
this.keyType = keyType;
this.valueType = valueType;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java
index 053731c9cf..798c70ed23 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriterImpl.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexOptions;
-import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleColStats;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -38,31 +37,27 @@ public class KeyValueDataFileWriterImpl extends
KeyValueDataFileWriter {
public KeyValueDataFileWriterImpl(
FileIO fileIO,
- FormatWriterFactory factory,
+ FileWriterContext context,
Path path,
Function<KeyValue, InternalRow> converter,
RowType keyType,
RowType valueType,
- SimpleStatsProducer statsProducer,
long schemaId,
int level,
- String compression,
CoreOptions options,
FileSource fileSource,
FileIndexOptions fileIndexOptions,
boolean isExternalPath) {
super(
fileIO,
- factory,
+ context,
path,
converter,
keyType,
valueType,
KeyValue.schema(keyType, valueType),
- statsProducer,
schemaId,
level,
- compression,
options,
fileSource,
fileIndexOptions,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
index 20cb8874d9..014df2e4c3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java
@@ -41,14 +41,17 @@ import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StatsCollectorFactories;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.function.IntFunction;
+import java.util.function.Function;
import java.util.stream.Collectors;
/** A factory to create {@link FileWriter}s for writing {@link KeyValue}
files. */
@@ -58,7 +61,7 @@ public class KeyValueFileWriterFactory {
private final long schemaId;
private final RowType keyType;
private final RowType valueType;
- private final WriteFormatContext formatContext;
+ private final FileWriterContextFactory formatContext;
private final long suggestedFileSize;
private final CoreOptions options;
private final FileIndexOptions fileIndexOptions;
@@ -66,7 +69,7 @@ public class KeyValueFileWriterFactory {
private KeyValueFileWriterFactory(
FileIO fileIO,
long schemaId,
- WriteFormatContext formatContext,
+ FileWriterContextFactory formatContext,
long suggestedFileSize,
CoreOptions options) {
this.fileIO = fileIO;
@@ -89,27 +92,29 @@ public class KeyValueFileWriterFactory {
@VisibleForTesting
public DataFilePathFactory pathFactory(int level) {
- return formatContext.pathFactory(level);
+ return formatContext.pathFactory(new WriteFormatKey(level, false));
}
public RollingFileWriter<KeyValue, DataFileMeta>
createRollingMergeTreeFileWriter(
int level, FileSource fileSource) {
+ WriteFormatKey key = new WriteFormatKey(level, false);
return new RollingFileWriter<>(
() -> {
- DataFilePathFactory pathFactory =
formatContext.pathFactory(level);
+ DataFilePathFactory pathFactory =
formatContext.pathFactory(key);
return createDataFileWriter(
- pathFactory.newPath(), level, fileSource,
pathFactory.isExternalPath());
+ pathFactory.newPath(), key, fileSource,
pathFactory.isExternalPath());
},
suggestedFileSize);
}
public RollingFileWriter<KeyValue, DataFileMeta>
createRollingChangelogFileWriter(int level) {
+ WriteFormatKey key = new WriteFormatKey(level, true);
return new RollingFileWriter<>(
() -> {
- DataFilePathFactory pathFactory =
formatContext.pathFactory(level);
+ DataFilePathFactory pathFactory =
formatContext.pathFactory(key);
return createDataFileWriter(
pathFactory.newChangelogPath(),
- level,
+ key,
FileSource.APPEND,
pathFactory.isExternalPath());
},
@@ -117,34 +122,30 @@ public class KeyValueFileWriterFactory {
}
private KeyValueDataFileWriter createDataFileWriter(
- Path path, int level, FileSource fileSource, boolean
isExternalPath) {
+ Path path, WriteFormatKey key, FileSource fileSource, boolean
isExternalPath) {
return formatContext.thinModeEnabled
? new KeyValueThinDataFileWriterImpl(
fileIO,
- formatContext.writerFactory(level),
+ formatContext.fileWriterContext(key),
path,
new KeyValueThinSerializer(keyType, valueType)::toRow,
keyType,
valueType,
- formatContext.statsProducer(level, options),
schemaId,
- level,
- formatContext.compression(level),
+ key.level,
options,
fileSource,
fileIndexOptions,
isExternalPath)
: new KeyValueDataFileWriterImpl(
fileIO,
- formatContext.writerFactory(level),
+ formatContext.fileWriterContext(key),
path,
new KeyValueSerializer(keyType, valueType)::toRow,
keyType,
valueType,
- formatContext.statsProducer(level, options),
schemaId,
- level,
- formatContext.compression(level),
+ key.level,
options,
fileSource,
fileIndexOptions,
@@ -152,12 +153,24 @@ public class KeyValueFileWriterFactory {
}
public void deleteFile(DataFileMeta file) {
-
fileIO.deleteQuietly(formatContext.pathFactory(file.level()).toPath(file));
+ // this path factory is only for path generation, so we don't care
about the true or false
+ // in WriteFormatKey
+ fileIO.deleteQuietly(
+ formatContext.pathFactory(new WriteFormatKey(file.level(),
false)).toPath(file));
}
public void copyFile(DataFileMeta sourceFile, DataFileMeta targetFile)
throws IOException {
- Path sourcePath =
formatContext.pathFactory(sourceFile.level()).toPath(sourceFile);
- Path targetPath =
formatContext.pathFactory(targetFile.level()).toPath(targetFile);
+ // this path factory is only for path generation, so we don't care
about the true or false
+ // in WriteFormatKey
+ boolean isChangelog = false;
+ Path sourcePath =
+ formatContext
+ .pathFactory(new WriteFormatKey(sourceFile.level(),
isChangelog))
+ .toPath(sourceFile);
+ Path targetPath =
+ formatContext
+ .pathFactory(new WriteFormatKey(targetFile.level(),
isChangelog))
+ .toPath(targetFile);
fileIO.copyFile(sourcePath, targetPath, true);
}
@@ -166,7 +179,7 @@ public class KeyValueFileWriterFactory {
}
public String newChangelogFileName(int level) {
- return formatContext.pathFactory(level).newChangelogFileName();
+ return formatContext.pathFactory(new WriteFormatKey(level,
true)).newChangelogFileName();
}
public static Builder builder(
@@ -175,7 +188,7 @@ public class KeyValueFileWriterFactory {
RowType keyType,
RowType valueType,
FileFormat fileFormat,
- Map<String, FileStorePathFactory> format2PathFactory,
+ Function<String, FileStorePathFactory> format2PathFactory,
long suggestedFileSize) {
return new Builder(
fileIO,
@@ -195,7 +208,7 @@ public class KeyValueFileWriterFactory {
private final RowType keyType;
private final RowType valueType;
private final FileFormat fileFormat;
- private final Map<String, FileStorePathFactory> format2PathFactory;
+ private final Function<String, FileStorePathFactory>
format2PathFactory;
private final long suggestedFileSize;
private Builder(
@@ -204,7 +217,7 @@ public class KeyValueFileWriterFactory {
RowType keyType,
RowType valueType,
FileFormat fileFormat,
- Map<String, FileStorePathFactory> format2PathFactory,
+ Function<String, FileStorePathFactory> format2PathFactory,
long suggestedFileSize) {
this.fileIO = fileIO;
this.schemaId = schemaId;
@@ -217,8 +230,8 @@ public class KeyValueFileWriterFactory {
public KeyValueFileWriterFactory build(
BinaryRow partition, int bucket, CoreOptions options) {
- WriteFormatContext context =
- new WriteFormatContext(
+ FileWriterContextFactory context =
+ new FileWriterContextFactory(
partition,
bucket,
keyType,
@@ -231,11 +244,11 @@ public class KeyValueFileWriterFactory {
}
}
- private static class WriteFormatContext {
+ private static class FileWriterContextFactory {
- private final IntFunction<String> level2Format;
- private final IntFunction<String> level2Compress;
- private final IntFunction<String> level2Stats;
+ private final Function<WriteFormatKey, String> key2Format;
+ private final Function<WriteFormatKey, String> key2Compress;
+ private final Function<WriteFormatKey, String> key2Stats;
private final Map<Pair<String, String>, Optional<SimpleStatsExtractor>>
formatStats2Extractor;
@@ -249,17 +262,17 @@ public class KeyValueFileWriterFactory {
private final RowType keyType;
private final RowType valueType;
private final RowType writeRowType;
- private final Map<String, FileStorePathFactory> parentFactories;
+ private final Function<String, FileStorePathFactory> parentFactories;
private final CoreOptions options;
private final boolean thinModeEnabled;
- private WriteFormatContext(
+ private FileWriterContextFactory(
BinaryRow partition,
int bucket,
RowType keyType,
RowType valueType,
- FileFormat defaultFormat,
- Map<String, FileStorePathFactory> parentFactories,
+ FileFormat defaultFileFormat,
+ Function<String, FileStorePathFactory> parentFactories,
CoreOptions options) {
this.partition = partition;
this.bucket = bucket;
@@ -271,20 +284,39 @@ public class KeyValueFileWriterFactory {
options.dataFileThinMode() && supportsThinMode(keyType,
valueType);
this.writeRowType =
KeyValue.schema(thinModeEnabled ? RowType.of() : keyType,
valueType);
+
Map<Integer, String> fileFormatPerLevel =
options.fileFormatPerLevel();
- this.level2Format =
- level ->
- fileFormatPerLevel.getOrDefault(
- level,
defaultFormat.getFormatIdentifier());
+ String defaultFormat = defaultFileFormat.getFormatIdentifier();
+ @Nullable String changelogFormat = options.changelogFileFormat();
+ this.key2Format =
+ key -> {
+ if (key.isChangelog && changelogFormat != null) {
+ return changelogFormat;
+ }
+ return fileFormatPerLevel.getOrDefault(key.level,
defaultFormat);
+ };
String defaultCompress = options.fileCompression();
+ @Nullable String changelogCompression =
options.changelogFileCompression();
Map<Integer, String> fileCompressionPerLevel =
options.fileCompressionPerLevel();
- this.level2Compress =
- level -> fileCompressionPerLevel.getOrDefault(level,
defaultCompress);
+ this.key2Compress =
+ key -> {
+ if (key.isChangelog && changelogCompression != null) {
+ return changelogCompression;
+ }
+ return fileCompressionPerLevel.getOrDefault(key.level,
defaultCompress);
+ };
String statsMode = options.statsMode();
Map<Integer, String> statsModePerLevel =
options.statsModePerLevel();
- this.level2Stats = level -> statsModePerLevel.getOrDefault(level,
statsMode);
+ @Nullable String changelogStatsMode =
options.changelogFileStatsMode();
+ this.key2Stats =
+ key -> {
+ if (key.isChangelog && changelogStatsMode != null) {
+ return changelogStatsMode;
+ }
+ return statsModePerLevel.getOrDefault(key.level,
statsMode);
+ };
this.formatStats2Extractor = new HashMap<>();
this.statsMode2AvroStats = new HashMap<>();
@@ -308,16 +340,21 @@ public class KeyValueFileWriterFactory {
return true;
}
- private SimpleStatsProducer statsProducer(int level, CoreOptions
options) {
- String format = level2Format.apply(level);
- String statsMode = level2Stats.apply(level);
+ private FileWriterContext fileWriterContext(WriteFormatKey key) {
+ return new FileWriterContext(
+ writerFactory(key), statsProducer(key),
key2Compress.apply(key));
+ }
+
+ private SimpleStatsProducer statsProducer(WriteFormatKey key) {
+ String format = key2Format.apply(key);
+ String statsMode = key2Stats.apply(key);
if (format.equals("avro")) {
// In avro format, minValue, maxValue, and nullCount are not
counted, so use
// SimpleStatsExtractor to collect stats
SimpleColStatsCollector.Factory[] factories =
statsMode2AvroStats.computeIfAbsent(
statsMode,
- key ->
+ k ->
StatsCollectorFactories.createStatsFactoriesForAvro(
statsMode, options,
writeRowType.getFieldNames()));
SimpleStatsCollector collector = new
SimpleStatsCollector(writeRowType, factories);
@@ -327,56 +364,71 @@ public class KeyValueFileWriterFactory {
Optional<SimpleStatsExtractor> extractor =
formatStats2Extractor.computeIfAbsent(
Pair.of(format, statsMode),
- key -> {
- SimpleColStatsCollector.Factory[]
statsFactories =
-
StatsCollectorFactories.createStatsFactories(
- statsMode,
- options,
- writeRowType.getFieldNames(),
- thinModeEnabled
- ?
keyType.getFieldNames()
- :
Collections.emptyList());
- boolean isDisabled =
- Arrays.stream(
-
SimpleColStatsCollector.create(
-
statsFactories))
- .allMatch(
- p ->
- p
-
instanceof
-
NoneSimpleColStatsCollector);
- if (isDisabled) {
- return Optional.empty();
- }
- return fileFormat(format)
- .createStatsExtractor(writeRowType,
statsFactories);
- });
+ k -> createSimpleStatsExtractor(format,
statsMode));
return SimpleStatsProducer.fromExtractor(extractor.orElse(null));
}
- private DataFilePathFactory pathFactory(int level) {
- String format = level2Format.apply(level);
+ private Optional<SimpleStatsExtractor> createSimpleStatsExtractor(
+ String format, String statsMode) {
+ SimpleColStatsCollector.Factory[] statsFactories =
+ StatsCollectorFactories.createStatsFactories(
+ statsMode,
+ options,
+ writeRowType.getFieldNames(),
+ thinModeEnabled ? keyType.getFieldNames() :
Collections.emptyList());
+ boolean isDisabled =
+
Arrays.stream(SimpleColStatsCollector.create(statsFactories))
+ .allMatch(p -> p instanceof
NoneSimpleColStatsCollector);
+ if (isDisabled) {
+ return Optional.empty();
+ }
+ return fileFormat(format).createStatsExtractor(writeRowType,
statsFactories);
+ }
+
+ private DataFilePathFactory pathFactory(WriteFormatKey key) {
+ String format = key2Format.apply(key);
return format2PathFactory.computeIfAbsent(
format,
- key ->
+ k ->
parentFactories
- .get(format)
+ .apply(format)
.createDataFilePathFactory(partition,
bucket));
}
- private FormatWriterFactory writerFactory(int level) {
+ private FormatWriterFactory writerFactory(WriteFormatKey key) {
return format2WriterFactory.computeIfAbsent(
- level2Format.apply(level),
+ key2Format.apply(key),
format ->
fileFormat(format).createWriterFactory(writeRowType));
}
- private String compression(int level) {
- return level2Compress.apply(level);
- }
-
private FileFormat fileFormat(String format) {
return formatFactory.computeIfAbsent(
format, k -> FileFormat.fromIdentifier(format,
options.toConfiguration()));
}
}
+
+ private static class WriteFormatKey {
+
+ private final int level;
+ private final boolean isChangelog;
+
+ private WriteFormatKey(int level, boolean isChangelog) {
+ this.level = level;
+ this.isChangelog = isChangelog;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WriteFormatKey formatKey = (WriteFormatKey) o;
+ return level == formatKey.level && isChangelog ==
formatKey.isChangelog;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(level, isChangelog);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java
index 128599939a..86d8df0ce7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueThinDataFileWriterImpl.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexOptions;
-import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleColStats;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -46,31 +45,27 @@ public class KeyValueThinDataFileWriterImpl extends
KeyValueDataFileWriter {
public KeyValueThinDataFileWriterImpl(
FileIO fileIO,
- FormatWriterFactory factory,
+ FileWriterContext context,
Path path,
Function<KeyValue, InternalRow> converter,
RowType keyType,
RowType valueType,
- SimpleStatsProducer statsProducer,
long schemaId,
int level,
- String compression,
CoreOptions options,
FileSource fileSource,
FileIndexOptions fileIndexOptions,
boolean isExternalPath) {
super(
fileIO,
- factory,
+ context,
path,
converter,
keyType,
valueType,
KeyValue.schema(RowType.of(), valueType),
- statsProducer,
schemaId,
level,
- compression,
options,
fileSource,
fileIndexOptions,
@@ -97,7 +92,7 @@ public class KeyValueThinDataFileWriterImpl extends
KeyValueDataFileWriter {
Pair<SimpleColStats[], SimpleColStats[]>
fetchKeyValueStats(SimpleColStats[] rowStats) {
int numKeyFields = keyType.getFieldCount();
// In thin mode, there is no key stats in rowStats, so we only jump
- // _SEQUNCE_NUMBER_ and _ROW_KIND_ stats. Therefore, the 'from' value
is 2.
+ // _SEQUENCE_NUMBER_ and _ROW_KIND_ stats. Therefore, the 'from' value
is 2.
SimpleColStats[] valFieldStats = Arrays.copyOfRange(rowStats, 2,
rowStats.length);
// Thin mode on, so need to map value stats to key stats.
SimpleColStats[] keyStats = new SimpleColStats[numKeyFields];
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index b5d2c9f0df..432845de32 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -20,7 +20,6 @@ package org.apache.paimon.io;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexOptions;
-import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileSource;
@@ -54,27 +53,17 @@ public class RowDataFileWriter extends
StatsCollectingSingleFileWriter<InternalR
public RowDataFileWriter(
FileIO fileIO,
- FormatWriterFactory factory,
+ FileWriterContext context,
Path path,
RowType writeSchema,
- SimpleStatsProducer statsProducer,
long schemaId,
LongCounter seqNumCounter,
- String fileCompression,
FileIndexOptions fileIndexOptions,
FileSource fileSource,
boolean asyncFileWrite,
boolean statsDenseStore,
boolean isExternalPath) {
- super(
- fileIO,
- factory,
- path,
- Function.identity(),
- writeSchema,
- statsProducer,
- fileCompression,
- asyncFileWrite);
+ super(fileIO, context, path, Function.identity(), writeSchema,
asyncFileWrite);
this.schemaId = schemaId;
this.seqNumCounter = seqNumCounter;
this.isExternalPath = isExternalPath;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index b6f4303946..9b68b774f7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -18,6 +18,7 @@
package org.apache.paimon.io;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
@@ -53,13 +54,12 @@ public class RowDataRollingFileWriter extends
RollingFileWriter<InternalRow, Dat
() ->
new RowDataFileWriter(
fileIO,
- fileFormat.createWriterFactory(writeSchema),
+ createFileWriterContext(
+ fileFormat, writeSchema,
statsCollectors, fileCompression),
pathFactory.newPath(),
writeSchema,
- createStatsProducer(fileFormat, writeSchema,
statsCollectors),
schemaId,
seqNumCounter,
- fileCompression,
fileIndexOptions,
fileSource,
asyncFileWrite,
@@ -68,6 +68,18 @@ public class RowDataRollingFileWriter extends
RollingFileWriter<InternalRow, Dat
targetFileSize);
}
+ @VisibleForTesting
+ static FileWriterContext createFileWriterContext(
+ FileFormat fileFormat,
+ RowType rowType,
+ SimpleColStatsCollector.Factory[] statsCollectors,
+ String fileCompression) {
+ return new FileWriterContext(
+ fileFormat.createWriterFactory(rowType),
+ createStatsProducer(fileFormat, rowType, statsCollectors),
+ fileCompression);
+ }
+
private static SimpleStatsProducer createStatsProducer(
FileFormat fileFormat,
RowType rowType,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
index e286836fdf..20cebfcb27 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
@@ -19,7 +19,6 @@
package org.apache.paimon.io;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SimpleColStats;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -45,16 +44,14 @@ public abstract class StatsCollectingSingleFileWriter<T, R>
extends SingleFileWr
public StatsCollectingSingleFileWriter(
FileIO fileIO,
- FormatWriterFactory factory,
+ FileWriterContext context,
Path path,
Function<T, InternalRow> converter,
RowType rowType,
- SimpleStatsProducer statsProducer,
- String compression,
boolean asyncWrite) {
- super(fileIO, factory, path, converter, compression, asyncWrite);
+ super(fileIO, context.factory(), path, converter,
context.compression(), asyncWrite);
this.rowType = rowType;
- this.statsProducer = statsProducer;
+ this.statsProducer = context.statsProducer();
this.isStatsDisabled = statsProducer.isStatsDisabled();
this.statsRequirePerRecord = statsProducer.requirePerRecord();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 45f6fb3f3b..b63023653a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -30,14 +30,13 @@ import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.stream.Collectors;
/** Factory which produces {@link Path}s for manifest files. */
@@ -297,14 +296,10 @@ public class FileStorePathFactory {
};
}
- public static Map<String, FileStorePathFactory> createFormatPathFactories(
+ public static Function<String, FileStorePathFactory>
createFormatPathFactories(
CoreOptions options,
BiFunction<CoreOptions, String, FileStorePathFactory>
formatPathFactory) {
- Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
- Set<String> formats = new
HashSet<>(options.fileFormatPerLevel().values());
- formats.add(options.fileFormatString());
- formats.forEach(
- format -> pathFactoryMap.put(format,
formatPathFactory.apply(options, format)));
- return pathFactoryMap;
+ Map<String, FileStorePathFactory> map = new ConcurrentHashMap<>();
+ return format -> map.computeIfAbsent(format, k ->
formatPathFactory.apply(options, format));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 2d0efdb6d1..10479892b0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -53,7 +53,6 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
@@ -244,22 +243,24 @@ public class KeyValueFileReadWriteTest {
Options options = new Options();
options.set(CoreOptions.METADATA_STATS_MODE, "FULL");
- Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
- pathFactoryMap.put(format, pathFactory);
- pathFactoryMap.put(
- CoreOptions.FILE_FORMAT.defaultValue().toString(),
- new FileStorePathFactory(
- path,
- RowType.of(),
- CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
- CoreOptions.FILE_FORMAT.defaultValue().toString(),
- CoreOptions.DATA_FILE_PREFIX.defaultValue(),
- CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
-
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
- CoreOptions.FILE_COMPRESSION.defaultValue(),
- null,
- null));
+ Function<String, FileStorePathFactory> pathFactoryMap =
+ new Function<String, FileStorePathFactory>() {
+ @Override
+ public FileStorePathFactory apply(String format) {
+ return new FileStorePathFactory(
+ path,
+ RowType.of(),
+
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
+ format,
+ CoreOptions.DATA_FILE_PREFIX.defaultValue(),
+
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue(),
+ null,
+ null);
+ }
+ };
return KeyValueFileWriterFactory.builder(
fileIO,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index d3eeb1fdef..a11c0e4d78 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -71,7 +71,12 @@ public class RollingFileWriterTest {
() ->
new RowDataFileWriter(
LocalFileIO.create(),
- fileFormat.createWriterFactory(SCHEMA),
+
RowDataRollingFileWriter.createFileWriterContext(
+ fileFormat,
+ SCHEMA,
+
SimpleColStatsCollector.createFullStatsFactories(
+
SCHEMA.getFieldCount()),
+
CoreOptions.FILE_COMPRESSION.defaultValue()),
new DataFilePathFactory(
new Path(tempDir +
"/bucket-0"),
CoreOptions.FILE_FORMAT
@@ -86,18 +91,8 @@ public class RollingFileWriterTest {
null)
.newPath(),
SCHEMA,
- SimpleStatsProducer.fromExtractor(
- fileFormat
- .createStatsExtractor(
- SCHEMA,
-
SimpleColStatsCollector
-
.createFullStatsFactories(
-
SCHEMA
-
.getFieldCount()))
- .orElse(null)),
0L,
new LongCounter(0),
-
CoreOptions.FILE_COMPRESSION.defaultValue(),
new FileIndexOptions(),
FileSource.APPEND,
true,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
index bbfa4d3659..47655a419b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
@@ -198,7 +198,7 @@ public class ChangelogMergeTreeRewriterTest {
keyType,
valueType,
new FlushingFileFormat("avro"),
- Collections.singletonMap("avro",
createNonPartFactory(path)),
+ k -> createNonPartFactory(path),
VALUE_128_MB.getBytes())
.build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(new Options()));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index fa9628b4c1..0bef84e7d8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -63,6 +63,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Function;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.apache.paimon.options.MemorySize.VALUE_128_MB;
@@ -223,8 +224,7 @@ public class ContainsLevelsTest {
private KeyValueFileWriterFactory createWriterFactory() {
Path path = new Path(tempDir.toUri().toString());
- Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
- pathFactoryMap.put("avro", createNonPartFactory(path));
+ Function<String, FileStorePathFactory> pathFactoryMap = k ->
createNonPartFactory(path);
return KeyValueFileWriterFactory.builder(
FileIOFinder.find(path),
0,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index b68a82935b..d4ddde5a44 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -64,6 +64,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Function;
import static org.apache.paimon.KeyValue.UNKNOWN_SEQUENCE;
import static org.apache.paimon.io.DataFileTestUtils.row;
@@ -305,8 +306,7 @@ public class LookupLevelsTest {
private KeyValueFileWriterFactory createWriterFactory() {
Path path = new Path(tempDir.toUri().toString());
String identifier = "avro";
- Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
- pathFactoryMap.put(identifier, createNonPartFactory(path));
+ Function<String, FileStorePathFactory> pathFactoryMap = k ->
createNonPartFactory(path);
return KeyValueFileWriterFactory.builder(
FileIOFinder.find(path),
0,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 1207daa86c..c149e44199 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -86,6 +86,7 @@ import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
@@ -175,8 +176,7 @@ public abstract class MergeTreeTestBase {
compactReaderFactory =
readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0,
DeletionVector.emptyFactory());
- Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
- pathFactoryMap.put(identifier, pathFactory);
+ Function<String, FileStorePathFactory> pathFactoryMap = k ->
pathFactory;
KeyValueFileWriterFactory.Builder writerFactoryBuilder =
KeyValueFileWriterFactory.builder(
LocalFileIO.create(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 6f6be4b803..59dee1092a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -100,6 +100,8 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.CHANGELOG_FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.CHANGELOG_FILE_STATS_MODE;
import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
@@ -109,6 +111,7 @@ import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET;
import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE;
import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE_PER_LEVEL;
import static org.apache.paimon.CoreOptions.MergeEngine;
import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
@@ -528,10 +531,20 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
"+2|22|202|binary|varbinary|mapKey:mapVal|multiset"));
}
- @Test
- public void testStreamingInputChangelog() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testStreamingInputChangelog(boolean specificConfig) throws
Exception {
FileStoreTable table =
- createFileStoreTable(conf -> conf.set(CHANGELOG_PRODUCER,
ChangelogProducer.INPUT));
+ createFileStoreTable(
+ conf -> {
+ conf.set(CHANGELOG_PRODUCER,
ChangelogProducer.INPUT);
+ if (specificConfig) {
+ conf.set(FILE_FORMAT, "parquet");
+ conf.set(METADATA_STATS_MODE, "full");
+ conf.set(CHANGELOG_FILE_FORMAT, "avro");
+ conf.set(CHANGELOG_FILE_STATS_MODE, "none");
+ }
+ });
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
write.write(rowData(1, 10, 100L));
@@ -560,6 +573,15 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
"+U 1|20|201|binary|varbinary|mapKey:mapVal|multiset",
"-U 1|10|101|binary|varbinary|mapKey:mapVal|multiset",
"+U 1|10|102|binary|varbinary|mapKey:mapVal|multiset");
+
+ if (specificConfig) {
+ Snapshot snapshot = table.latestSnapshot().get();
+ ManifestFileMeta manifest =
+
table.manifestListReader().read(snapshot.changelogManifestList()).get(0);
+ DataFileMeta file =
table.manifestFileReader().read(manifest.fileName()).get(0).file();
+ assertThat(file.fileName()).endsWith(".avro");
+
assertThat(file.valueStats().minValues().getFieldCount()).isEqualTo(0);
+ }
}
@Test
@@ -2130,13 +2152,20 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
innerTestTableQuery(table);
}
- @Test
- public void testLookupWithDropDelete() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testLookupWithDropDelete(boolean specificConfig) throws
Exception {
FileStoreTable table =
createFileStoreTable(
conf -> {
conf.set(CHANGELOG_PRODUCER, LOOKUP);
conf.set("num-levels", "2");
+ if (specificConfig) {
+ conf.set(FILE_FORMAT, "parquet");
+ conf.set(METADATA_STATS_MODE, "full");
+ conf.set(CHANGELOG_FILE_FORMAT, "avro");
+ conf.set(CHANGELOG_FILE_STATS_MODE, "none");
+ }
});
IOManager ioManager = IOManager.create(tablePath.toString());
StreamTableWrite write =
table.newWrite(commitUser).withIOManager(ioManager);
@@ -2165,6 +2194,14 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
.isEqualTo(
Collections.singletonList(
"1|2|200|binary|varbinary|mapKey:mapVal|multiset"));
+
+ if (specificConfig) {
+ ManifestFileMeta manifest =
+
table.manifestListReader().read(latestSnapshot.changelogManifestList()).get(0);
+ DataFileMeta file =
table.manifestFileReader().read(manifest.fileName()).get(0).file();
+ assertThat(file.fileName()).endsWith(".avro");
+
assertThat(file.valueStats().minValues().getFieldCount()).isEqualTo(0);
+ }
}
@ParameterizedTest(name = "changelog-producer = {0}")