This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c761c6428 [core] use append-only mode as the default write mode (#976)
c761c6428 is described below
commit c761c6428b51e0f70b1a516f036ecd4b658d8087
Author: Kerwin <[email protected]>
AuthorDate: Thu May 4 15:57:55 2023 +0800
[core] use append-only mode as the default write mode (#976)
---
docs/layouts/shortcodes/generated/core_configuration.html | 4 ++--
.../src/main/java/org/apache/paimon/CoreOptions.java | 2 +-
paimon-core/src/main/java/org/apache/paimon/WriteMode.java | 3 +++
.../java/org/apache/paimon/table/FileStoreTableFactory.java | 13 ++++++++++---
.../java/org/apache/paimon/schema/SchemaManagerTest.java | 1 +
.../org/apache/paimon/tests/FileStoreStreamE2eTest.java | 1 +
.../java/org/apache/paimon/flink/FlinkActionITCase.java | 3 ++-
.../java/org/apache/paimon/flink/FlinkActionITCase.java | 3 ++-
.../org/apache/paimon/flink/ContinuousFileStoreITCase.java | 5 ++++-
.../test/java/org/apache/paimon/flink/FileStoreITCase.java | 3 +++
.../java/org/apache/paimon/flink/ForceCompactionITCase.java | 7 +++++--
.../java/org/apache/paimon/flink/ReadWriteTableITCase.java | 10 ++++++++--
.../org/apache/paimon/flink/action/DeleteActionITCase.java | 6 +++++-
.../apache/paimon/flink/util/ReadWriteTableTestUtil.java | 3 +++
.../org/apache/paimon/hive/PaimonStorageHandlerITCase.java | 1 +
.../apache/paimon/hive/mapred/PaimonRecordReaderTest.java | 2 ++
.../java/org/apache/paimon/spark/SimpleTableTestHelper.java | 4 +++-
.../java/org/apache/paimon/spark/SparkReadTestBase.java | 2 +-
18 files changed, 57 insertions(+), 16 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6c704aa38..edf14f17f 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -394,9 +394,9 @@
</tr>
<tr>
<td><h5>write-mode</h5></td>
- <td style="word-wrap: break-word;">change-log</td>
+ <td style="word-wrap: break-word;">auto</td>
<td><p>Enum</p></td>
- <td>Specify the write mode for table.<br /><br />Possible
values:<ul><li>"append-only": The table can only accept append-only insert
operations. Neither data deduplication nor any primary key constraints will be
done when inserting rows into paimon.</li><li>"change-log": The table can
accept insert/delete/update operations.</li></ul></td>
+ <td>Specify the write mode for table.<br /><br />Possible
values:<ul><li>"auto": The change-log for table with primary key, append-only
for table without primary key.</li><li>"append-only": The table can only accept
append-only insert operations. Neither data deduplication nor any primary key
constraints will be done when inserting rows into paimon.</li><li>"change-log":
The table can accept insert/delete/update operations.</li></ul></td>
</tr>
<tr>
<td><h5>write-only</h5></td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 7bb992cfa..0405d021f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -196,7 +196,7 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<WriteMode> WRITE_MODE =
key("write-mode")
.enumType(WriteMode.class)
- .defaultValue(WriteMode.CHANGE_LOG)
+ .defaultValue(WriteMode.AUTO)
.withDescription("Specify the write mode for table.");
public static final ConfigOption<Boolean> WRITE_ONLY =
diff --git a/paimon-core/src/main/java/org/apache/paimon/WriteMode.java
b/paimon-core/src/main/java/org/apache/paimon/WriteMode.java
index e815435ee..70c435af4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/WriteMode.java
+++ b/paimon-core/src/main/java/org/apache/paimon/WriteMode.java
@@ -25,6 +25,9 @@ import static
org.apache.paimon.options.description.TextElement.text;
/** Defines the write mode for paimon. */
public enum WriteMode implements DescribedEnum {
+ AUTO(
+ "auto",
+ "The change-log for table with primary key, append-only for table
without primary key."),
APPEND_ONLY(
"append-only",
"The table can only accept append-only insert operations. Neither
data deduplication nor any "
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 7a0d099b8..e2d634b75 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -72,8 +72,16 @@ public class FileStoreTableFactory {
public static FileStoreTable create(
FileIO fileIO, Path tablePath, TableSchema tableSchema, Options
dynamicOptions) {
FileStoreTable table;
- if (Options.fromMap(tableSchema.options()).get(CoreOptions.WRITE_MODE)
- == WriteMode.APPEND_ONLY) {
+ Options coreOptions = Options.fromMap(tableSchema.options());
+ WriteMode writeMode = coreOptions.get(CoreOptions.WRITE_MODE);
+ if (writeMode == WriteMode.AUTO) {
+ writeMode =
+ tableSchema.primaryKeys().isEmpty()
+ ? WriteMode.APPEND_ONLY
+ : WriteMode.CHANGE_LOG;
+ coreOptions.set(CoreOptions.WRITE_MODE, writeMode);
+ }
+ if (writeMode == WriteMode.APPEND_ONLY) {
table = new AppendOnlyFileStoreTable(fileIO, tablePath,
tableSchema);
} else {
if (tableSchema.primaryKeys().isEmpty()) {
@@ -82,7 +90,6 @@ public class FileStoreTableFactory {
table = new ChangelogWithKeyFileStoreTable(fileIO, tablePath,
tableSchema);
}
}
-
return table.copy(dynamicOptions.toMap());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 68289fad7..5d9558032 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -248,6 +248,7 @@ public class SchemaManagerTest {
public void testChangelogTableWithFullCompaction() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("key", "value");
+ options.put(CoreOptions.WRITE_MODE.key(),
WriteMode.CHANGE_LOG.toString());
options.put(
CoreOptions.CHANGELOG_PRODUCER.key(),
CoreOptions.ChangelogProducer.FULL_COMPACTION.toString());
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreStreamE2eTest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreStreamE2eTest.java
index c7273daf9..8f14840c2 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreStreamE2eTest.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreStreamE2eTest.java
@@ -75,6 +75,7 @@ public class FileStoreStreamE2eTest extends E2eTestBase {
+ " b INT,\n"
+ " rn BIGINT\n"
+ ") WITH (\n"
+ + " 'write-mode'='change-log',\n"
+ " 'bucket' = '3'\n"
+ ");";
diff --git
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
index a4590c743..8e721cba9 100644
---
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
+++
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
@@ -38,7 +38,8 @@ public class FlinkActionITCase extends CatalogITCaseBase {
}
protected List<String> ddl() {
- return Collections.singletonList("CREATE TABLE T (k INT, v STRING)");
+ return Collections.singletonList(
+ "CREATE TABLE T (k INT, v STRING) WITH
('write-mode'='change-log')");
}
@Test
diff --git
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
index a4590c743..8e721cba9 100644
---
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
+++
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
@@ -38,7 +38,8 @@ public class FlinkActionITCase extends CatalogITCaseBase {
}
protected List<String> ddl() {
- return Collections.singletonList("CREATE TABLE T (k INT, v STRING)");
+ return Collections.singletonList(
+ "CREATE TABLE T (k INT, v STRING) WITH
('write-mode'='change-log')");
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index c572d0309..368a9e1ec 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -59,7 +59,10 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
@Override
protected List<String> ddl() {
- String options = changelogFile ? " WITH('changelog-producer'='input')"
: "";
+ String options =
+ changelogFile
+ ? "
WITH('write-mode'='change-log','changelog-producer'='input')"
+ : "";
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)"
+ options,
"CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING,
PRIMARY KEY (a) NOT ENFORCED)"
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index 05070c5e9..c8d5af7d4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.WriteMode;
import org.apache.paimon.flink.sink.FileStoreSink;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.source.ContinuousFileStoreSource;
@@ -77,6 +78,7 @@ import java.util.stream.Stream;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.WRITE_MODE;
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;
import static org.assertj.core.api.Assertions.assertThat;
@@ -442,6 +444,7 @@ public class FileStoreITCase extends AbstractTestBase {
options.set(PATH, FailingFileIO.getFailingPath(failingName,
temporaryPath));
}
options.set(FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ options.set(WRITE_MODE, WriteMode.CHANGE_LOG);
return options;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
index 50ab2d0aa..af54b356d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
@@ -50,12 +50,15 @@ public class ForceCompactionITCase extends
CatalogITCaseBase {
+ " f0 INT\n, "
+ " f1 STRING\n, "
+ " f2 STRING\n"
- + ") PARTITIONED BY (f1)",
+ + ") PARTITIONED BY (f1)"
+ + " WITH (\n"
+ + "'write-mode' = 'change-log')",
"CREATE TABLE IF NOT EXISTS T1 (\n"
+ " f0 INT\n, "
+ " f1 STRING\n, "
+ " f2 STRING\n"
- + ")",
+ + ") WITH (\n"
+ + "'write-mode' = 'change-log')",
"CREATE TABLE IF NOT EXISTS T2 (\n"
+ " f0 INT\n, "
+ " f1 STRING\n, "
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index dbc12ffe2..37ad4a1e3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.WriteMode;
import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
@@ -30,6 +31,8 @@ import org.apache.paimon.testutils.assertj.AssertionUtils;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -66,6 +69,7 @@ import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.ch
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
+import static org.apache.paimon.CoreOptions.WRITE_MODE;
import static
org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable;
import static
org.apache.paimon.flink.FlinkConnectorOptions.INFER_SCAN_PARALLELISM;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
@@ -523,7 +527,8 @@ public class ReadWriteTableITCase extends AbstractTestBase {
createTable(
Arrays.asList("currency STRING", "rate BIGINT", "dt
STRING"),
Collections.emptyList(),
- Collections.singletonList("dt"));
+ Collections.singletonList("dt"),
+ ImmutableMap.of(WRITE_MODE.key(),
WriteMode.CHANGE_LOG.toString()));
insertIntoFromTable(temporaryTable, table);
@@ -667,7 +672,8 @@ public class ReadWriteTableITCase extends AbstractTestBase {
createTable(
Arrays.asList("currency STRING", "rate BIGINT"),
Collections.emptyList(),
- Collections.emptyList());
+ Collections.emptyList(),
+ ImmutableMap.of(WRITE_MODE.key(),
WriteMode.CHANGE_LOG.toString()));
insertIntoFromTable(temporaryTable, table);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
index 18cf0d9a1..b614bf503 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.WriteMode;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
@@ -39,6 +40,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
@@ -144,12 +146,14 @@ public class DeleteActionITCase extends ActionITCaseBase {
}
private void prepareTable(boolean hasPk) throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.WRITE_MODE.key(),
WriteMode.CHANGE_LOG.toString());
FileStoreTable table =
createFileStoreTable(
ROW_TYPE,
Collections.emptyList(),
hasPk ? Collections.singletonList("k") :
Collections.emptyList(),
- new HashMap<>());
+ options);
snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index 3ffba193f..5c4908aaf 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.util;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.WriteMode;
import org.apache.paimon.flink.ReadWriteTableITCase;
import org.apache.paimon.flink.StreamingReadWriteTableWithKafkaLogITCase;
import org.apache.paimon.utils.BlockingIterator;
@@ -47,6 +48,7 @@ import java.util.concurrent.TimeoutException;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
import static org.apache.paimon.CoreOptions.SCAN_MODE;
+import static org.apache.paimon.CoreOptions.WRITE_MODE;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
import static org.apache.paimon.flink.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC;
@@ -149,6 +151,7 @@ public class ReadWriteTableTestUtil {
put(BOOTSTRAP_SERVERS.key(),
getBootstrapServers());
put(TOPIC.key(), topic);
put(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false");
+ put(WRITE_MODE.key(),
WriteMode.CHANGE_LOG.toString());
}
});
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
index 83bb74d89..4f5dfccbb 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
@@ -627,6 +627,7 @@ public class PaimonStorageHandlerITCase {
conf.set(CatalogOptions.WAREHOUSE, path);
conf.set(CoreOptions.BUCKET, 2);
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
Identifier identifier = Identifier.create(DATABASE_NAME,
tableNameNotNull);
Table table =
FileStoreTestUtils.createFileStoreTable(
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
index f83a6269e..88ab06cc2 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.hive.mapred;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.WriteMode;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
@@ -101,6 +102,7 @@ public class PaimonRecordReaderTest {
Options conf = new Options();
conf.set(CatalogOptions.WAREHOUSE, tempDir.toString());
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
Table table =
FileStoreTestUtils.createFileStoreTable(
conf,
diff --git
a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SimpleTableTestHelper.java
b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SimpleTableTestHelper.java
index a7df4cee0..829f4fc00 100644
---
a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SimpleTableTestHelper.java
+++
b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SimpleTableTestHelper.java
@@ -19,6 +19,7 @@
package org.apache.paimon.spark;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.WriteMode;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -47,7 +48,8 @@ public class SimpleTableTestHelper {
public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
Map<String, String> options = new HashMap<>();
// orc is shaded, can not find shaded classes in ide
- options.put(CoreOptions.FILE_FORMAT.key(), "avro");
+ options.put(CoreOptions.FILE_FORMAT.key(),
CoreOptions.FileFormatType.AVRO.toString());
+ options.put(CoreOptions.WRITE_MODE.key(),
WriteMode.CHANGE_LOG.toString());
new SchemaManager(LocalFileIO.create(), path)
.createTable(
new Schema(
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index a5f8b2978..191b09d5e 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -165,7 +165,7 @@ public abstract class SparkReadTestBase {
protected static void createTable(String tableName) {
spark.sql(
String.format(
- "CREATE TABLE paimon.default.%s (a INT NOT NULL, b
BIGINT, c STRING) TBLPROPERTIES ('file.format'='avro')",
+ "CREATE TABLE paimon.default.%s (a INT NOT NULL, b
BIGINT, c STRING) TBLPROPERTIES
('write-mode'='change-log','file.format'='avro')",
tableName));
}