This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c761c6428 [core] use append-only mode as the default write mode (#976)
c761c6428 is described below

commit c761c6428b51e0f70b1a516f036ecd4b658d8087
Author: Kerwin <[email protected]>
AuthorDate: Thu May 4 15:57:55 2023 +0800

    [core] use append-only mode as the default write mode (#976)
---
 docs/layouts/shortcodes/generated/core_configuration.html   |  4 ++--
 .../src/main/java/org/apache/paimon/CoreOptions.java        |  2 +-
 paimon-core/src/main/java/org/apache/paimon/WriteMode.java  |  3 +++
 .../java/org/apache/paimon/table/FileStoreTableFactory.java | 13 ++++++++++---
 .../java/org/apache/paimon/schema/SchemaManagerTest.java    |  1 +
 .../org/apache/paimon/tests/FileStoreStreamE2eTest.java     |  1 +
 .../java/org/apache/paimon/flink/FlinkActionITCase.java     |  3 ++-
 .../java/org/apache/paimon/flink/FlinkActionITCase.java     |  3 ++-
 .../org/apache/paimon/flink/ContinuousFileStoreITCase.java  |  5 ++++-
 .../test/java/org/apache/paimon/flink/FileStoreITCase.java  |  3 +++
 .../java/org/apache/paimon/flink/ForceCompactionITCase.java |  7 +++++--
 .../java/org/apache/paimon/flink/ReadWriteTableITCase.java  | 10 ++++++++--
 .../org/apache/paimon/flink/action/DeleteActionITCase.java  |  6 +++++-
 .../apache/paimon/flink/util/ReadWriteTableTestUtil.java    |  3 +++
 .../org/apache/paimon/hive/PaimonStorageHandlerITCase.java  |  1 +
 .../apache/paimon/hive/mapred/PaimonRecordReaderTest.java   |  2 ++
 .../java/org/apache/paimon/spark/SimpleTableTestHelper.java |  4 +++-
 .../java/org/apache/paimon/spark/SparkReadTestBase.java     |  2 +-
 18 files changed, 57 insertions(+), 16 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6c704aa38..edf14f17f 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -394,9 +394,9 @@
         </tr>
         <tr>
             <td><h5>write-mode</h5></td>
-            <td style="word-wrap: break-word;">change-log</td>
+            <td style="word-wrap: break-word;">auto</td>
             <td><p>Enum</p></td>
-            <td>Specify the write mode for table.<br /><br />Possible 
values:<ul><li>"append-only": The table can only accept append-only insert 
operations. Neither data deduplication nor any primary key constraints will be 
done when inserting rows into paimon.</li><li>"change-log": The table can 
accept insert/delete/update operations.</li></ul></td>
+            <td>Specify the write mode for table.<br /><br />Possible 
values:<ul><li>"auto": The change-log for table with primary key, append-only 
for table without primary key.</li><li>"append-only": The table can only accept 
append-only insert operations. Neither data deduplication nor any primary key 
constraints will be done when inserting rows into paimon.</li><li>"change-log": 
The table can accept insert/delete/update operations.</li></ul></td>
         </tr>
         <tr>
             <td><h5>write-only</h5></td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 7bb992cfa..0405d021f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -196,7 +196,7 @@ public class CoreOptions implements Serializable {
     public static final ConfigOption<WriteMode> WRITE_MODE =
             key("write-mode")
                     .enumType(WriteMode.class)
-                    .defaultValue(WriteMode.CHANGE_LOG)
+                    .defaultValue(WriteMode.AUTO)
                     .withDescription("Specify the write mode for table.");
 
     public static final ConfigOption<Boolean> WRITE_ONLY =
diff --git a/paimon-core/src/main/java/org/apache/paimon/WriteMode.java 
b/paimon-core/src/main/java/org/apache/paimon/WriteMode.java
index e815435ee..70c435af4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/WriteMode.java
+++ b/paimon-core/src/main/java/org/apache/paimon/WriteMode.java
@@ -25,6 +25,9 @@ import static 
org.apache.paimon.options.description.TextElement.text;
 
 /** Defines the write mode for paimon. */
 public enum WriteMode implements DescribedEnum {
+    AUTO(
+            "auto",
+            "The change-log for table with primary key, append-only for table 
without primary key."),
     APPEND_ONLY(
             "append-only",
             "The table can only accept append-only insert operations. Neither 
data deduplication nor any "
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 7a0d099b8..e2d634b75 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -72,8 +72,16 @@ public class FileStoreTableFactory {
     public static FileStoreTable create(
             FileIO fileIO, Path tablePath, TableSchema tableSchema, Options 
dynamicOptions) {
         FileStoreTable table;
-        if (Options.fromMap(tableSchema.options()).get(CoreOptions.WRITE_MODE)
-                == WriteMode.APPEND_ONLY) {
+        Options coreOptions = Options.fromMap(tableSchema.options());
+        WriteMode writeMode = coreOptions.get(CoreOptions.WRITE_MODE);
+        if (writeMode == WriteMode.AUTO) {
+            writeMode =
+                    tableSchema.primaryKeys().isEmpty()
+                            ? WriteMode.APPEND_ONLY
+                            : WriteMode.CHANGE_LOG;
+            coreOptions.set(CoreOptions.WRITE_MODE, writeMode);
+        }
+        if (writeMode == WriteMode.APPEND_ONLY) {
             table = new AppendOnlyFileStoreTable(fileIO, tablePath, 
tableSchema);
         } else {
             if (tableSchema.primaryKeys().isEmpty()) {
@@ -82,7 +90,6 @@ public class FileStoreTableFactory {
                 table = new ChangelogWithKeyFileStoreTable(fileIO, tablePath, 
tableSchema);
             }
         }
-
         return table.copy(dynamicOptions.toMap());
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 68289fad7..5d9558032 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -248,6 +248,7 @@ public class SchemaManagerTest {
     public void testChangelogTableWithFullCompaction() throws Exception {
         Map<String, String> options = new HashMap<>();
         options.put("key", "value");
+        options.put(CoreOptions.WRITE_MODE.key(), 
WriteMode.CHANGE_LOG.toString());
         options.put(
                 CoreOptions.CHANGELOG_PRODUCER.key(),
                 CoreOptions.ChangelogProducer.FULL_COMPACTION.toString());
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreStreamE2eTest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreStreamE2eTest.java
index c7273daf9..8f14840c2 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreStreamE2eTest.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FileStoreStreamE2eTest.java
@@ -75,6 +75,7 @@ public class FileStoreStreamE2eTest extends E2eTestBase {
                         + "    b INT,\n"
                         + "    rn BIGINT\n"
                         + ") WITH (\n"
+                        + "    'write-mode'='change-log',\n"
                         + "    'bucket' = '3'\n"
                         + ");";
 
diff --git 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
index a4590c743..8e721cba9 100644
--- 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
+++ 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
@@ -38,7 +38,8 @@ public class FlinkActionITCase extends CatalogITCaseBase {
     }
 
     protected List<String> ddl() {
-        return Collections.singletonList("CREATE TABLE T (k INT, v STRING)");
+        return Collections.singletonList(
+                "CREATE TABLE T (k INT, v STRING) WITH 
('write-mode'='change-log')");
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
index a4590c743..8e721cba9 100644
--- 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
+++ 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/FlinkActionITCase.java
@@ -38,7 +38,8 @@ public class FlinkActionITCase extends CatalogITCaseBase {
     }
 
     protected List<String> ddl() {
-        return Collections.singletonList("CREATE TABLE T (k INT, v STRING)");
+        return Collections.singletonList(
+                "CREATE TABLE T (k INT, v STRING) WITH 
('write-mode'='change-log')");
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index c572d0309..368a9e1ec 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -59,7 +59,10 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
 
     @Override
     protected List<String> ddl() {
-        String options = changelogFile ? " WITH('changelog-producer'='input')" 
: "";
+        String options =
+                changelogFile
+                        ? " 
WITH('write-mode'='change-log','changelog-producer'='input')"
+                        : "";
         return Arrays.asList(
                 "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)" 
+ options,
                 "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, 
PRIMARY KEY (a) NOT ENFORCED)"
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index 05070c5e9..c8d5af7d4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.WriteMode;
 import org.apache.paimon.flink.sink.FileStoreSink;
 import org.apache.paimon.flink.sink.FlinkSinkBuilder;
 import org.apache.paimon.flink.source.ContinuousFileStoreSource;
@@ -77,6 +78,7 @@ import java.util.stream.Stream;
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.WRITE_MODE;
 import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
 import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -442,6 +444,7 @@ public class FileStoreITCase extends AbstractTestBase {
             options.set(PATH, FailingFileIO.getFailingPath(failingName, 
temporaryPath));
         }
         options.set(FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+        options.set(WRITE_MODE, WriteMode.CHANGE_LOG);
         return options;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
index 50ab2d0aa..af54b356d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
@@ -50,12 +50,15 @@ public class ForceCompactionITCase extends 
CatalogITCaseBase {
                         + "  f0 INT\n, "
                         + "  f1 STRING\n, "
                         + "  f2 STRING\n"
-                        + ") PARTITIONED BY (f1)",
+                        + ") PARTITIONED BY (f1)"
+                        + " WITH (\n"
+                        + "'write-mode' = 'change-log')",
                 "CREATE TABLE IF NOT EXISTS T1 (\n"
                         + "  f0 INT\n, "
                         + "  f1 STRING\n, "
                         + "  f2 STRING\n"
-                        + ")",
+                        + ") WITH (\n"
+                        + "'write-mode' = 'change-log')",
                 "CREATE TABLE IF NOT EXISTS T2 (\n"
                         + "  f0 INT\n, "
                         + "  f1 STRING\n, "
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index dbc12ffe2..37ad4a1e3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.WriteMode;
 import org.apache.paimon.flink.sink.FlinkTableSink;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
@@ -30,6 +31,8 @@ import org.apache.paimon.testutils.assertj.AssertionUtils;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.BlockingIterator;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -66,6 +69,7 @@ import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.ch
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
+import static org.apache.paimon.CoreOptions.WRITE_MODE;
 import static 
org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.INFER_SCAN_PARALLELISM;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
@@ -523,7 +527,8 @@ public class ReadWriteTableITCase extends AbstractTestBase {
                 createTable(
                         Arrays.asList("currency STRING", "rate BIGINT", "dt 
STRING"),
                         Collections.emptyList(),
-                        Collections.singletonList("dt"));
+                        Collections.singletonList("dt"),
+                        ImmutableMap.of(WRITE_MODE.key(), 
WriteMode.CHANGE_LOG.toString()));
 
         insertIntoFromTable(temporaryTable, table);
 
@@ -667,7 +672,8 @@ public class ReadWriteTableITCase extends AbstractTestBase {
                 createTable(
                         Arrays.asList("currency STRING", "rate BIGINT"),
                         Collections.emptyList(),
-                        Collections.emptyList());
+                        Collections.emptyList(),
+                        ImmutableMap.of(WRITE_MODE.key(), 
WriteMode.CHANGE_LOG.toString()));
 
         insertIntoFromTable(temporaryTable, table);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
index 18cf0d9a1..b614bf503 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.WriteMode;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
@@ -39,6 +40,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
@@ -144,12 +146,14 @@ public class DeleteActionITCase extends ActionITCaseBase {
     }
 
     private void prepareTable(boolean hasPk) throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.WRITE_MODE.key(), 
WriteMode.CHANGE_LOG.toString());
         FileStoreTable table =
                 createFileStoreTable(
                         ROW_TYPE,
                         Collections.emptyList(),
                         hasPk ? Collections.singletonList("k") : 
Collections.emptyList(),
-                        new HashMap<>());
+                        options);
         snapshotManager = table.snapshotManager();
         StreamWriteBuilder streamWriteBuilder =
                 table.newStreamWriteBuilder().withCommitUser(commitUser);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index 3ffba193f..5c4908aaf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.util;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.WriteMode;
 import org.apache.paimon.flink.ReadWriteTableITCase;
 import org.apache.paimon.flink.StreamingReadWriteTableWithKafkaLogITCase;
 import org.apache.paimon.utils.BlockingIterator;
@@ -47,6 +48,7 @@ import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
 import static org.apache.paimon.CoreOptions.SCAN_MODE;
+import static org.apache.paimon.CoreOptions.WRITE_MODE;
 import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
 import static org.apache.paimon.flink.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC;
@@ -149,6 +151,7 @@ public class ReadWriteTableTestUtil {
                                 put(BOOTSTRAP_SERVERS.key(), 
getBootstrapServers());
                                 put(TOPIC.key(), topic);
                                 
put(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false");
+                                put(WRITE_MODE.key(), 
WriteMode.CHANGE_LOG.toString());
                             }
                         });
 
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
index 83bb74d89..4f5dfccbb 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
@@ -627,6 +627,7 @@ public class PaimonStorageHandlerITCase {
         conf.set(CatalogOptions.WAREHOUSE, path);
         conf.set(CoreOptions.BUCKET, 2);
         conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
         Identifier identifier = Identifier.create(DATABASE_NAME, 
tableNameNotNull);
         Table table =
                 FileStoreTestUtils.createFileStoreTable(
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
index f83a6269e..88ab06cc2 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.hive.mapred;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.WriteMode;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
@@ -101,6 +102,7 @@ public class PaimonRecordReaderTest {
         Options conf = new Options();
         conf.set(CatalogOptions.WAREHOUSE, tempDir.toString());
         conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
         Table table =
                 FileStoreTestUtils.createFileStoreTable(
                         conf,
diff --git 
a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SimpleTableTestHelper.java
 
b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SimpleTableTestHelper.java
index a7df4cee0..829f4fc00 100644
--- 
a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SimpleTableTestHelper.java
+++ 
b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SimpleTableTestHelper.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.WriteMode;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
@@ -47,7 +48,8 @@ public class SimpleTableTestHelper {
     public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
         Map<String, String> options = new HashMap<>();
         // orc is shaded, can not find shaded classes in ide
-        options.put(CoreOptions.FILE_FORMAT.key(), "avro");
+        options.put(CoreOptions.FILE_FORMAT.key(), 
CoreOptions.FileFormatType.AVRO.toString());
+        options.put(CoreOptions.WRITE_MODE.key(), 
WriteMode.CHANGE_LOG.toString());
         new SchemaManager(LocalFileIO.create(), path)
                 .createTable(
                         new Schema(
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index a5f8b2978..191b09d5e 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -165,7 +165,7 @@ public abstract class SparkReadTestBase {
     protected static void createTable(String tableName) {
         spark.sql(
                 String.format(
-                        "CREATE TABLE paimon.default.%s (a INT NOT NULL, b 
BIGINT, c STRING) TBLPROPERTIES ('file.format'='avro')",
+                        "CREATE TABLE paimon.default.%s (a INT NOT NULL, b 
BIGINT, c STRING) TBLPROPERTIES 
('write-mode'='change-log','file.format'='avro')",
                         tableName));
     }
 

Reply via email to