This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f50507d233 [core] Clean up invalid branch cache and not cache system
table in caching catalog (#4681)
f50507d233 is described below
commit f50507d233adc5aa5472dc1fc220bb93177db00d
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Dec 12 13:22:43 2024 +0800
[core] Clean up invalid branch cache and not cache system table in caching
catalog (#4681)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 8 +-
.../org/apache/paimon/catalog/CachingCatalog.java | 40 ++++------
.../java/org/apache/paimon/catalog/Identifier.java | 16 ++++
.../apache/paimon/catalog/CachingCatalogTest.java | 92 +++++++++-------------
.../flink/procedure/DeleteBranchProcedure.java | 5 +-
.../apache/paimon/spark/DataFrameWriteTest.scala | 5 ++
.../org/apache/paimon/spark/PaimonSinkTest.scala | 5 ++
.../apache/paimon/spark/SparkGenericCatalog.java | 2 +-
.../spark/procedure/DeleteBranchProcedure.java | 23 ++++--
.../org/apache/paimon/spark/PaimonSinkTest.scala | 5 ++
.../apache/paimon/spark/PaimonSparkTestBase.scala | 1 -
.../paimon/spark/sql/DataFrameWriteTest.scala | 6 ++
12 files changed, 112 insertions(+), 96 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 2b277a29b8..b56fec279a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -381,7 +381,7 @@ public abstract class AbstractCatalog implements Catalog {
throw new TableNotExistException(identifier);
}
return table;
- } else if (isSpecifiedSystemTable(identifier)) {
+ } else if (identifier.isSystemTable()) {
Table originTable =
getDataOrFormatTable(
new Identifier(
@@ -519,12 +519,8 @@ public abstract class AbstractCatalog implements Catalog {
}
}
- public static boolean isSpecifiedSystemTable(Identifier identifier) {
- return identifier.getSystemTableName() != null;
- }
-
protected static boolean isTableInSystemDatabase(Identifier identifier) {
- return isSystemDatabase(identifier.getDatabaseName()) ||
isSpecifiedSystemTable(identifier);
+ return isSystemDatabase(identifier.getDatabaseName()) ||
identifier.isSystemTable();
}
protected static void checkNotSystemTable(Identifier identifier, String
method) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index f67f19700d..e92a589d41 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -26,7 +26,6 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.SystemTableLoader;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SegmentsCache;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
@@ -48,7 +47,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
@@ -56,7 +54,7 @@ import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
import static
org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE;
-import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** A {@link Catalog} to cache databases and tables and manifests. */
public class CachingCatalog extends DelegateCatalog {
@@ -203,6 +201,9 @@ public class CachingCatalog extends DelegateCatalog {
throws TableNotExistException {
super.dropTable(identifier, ignoreIfNotExists);
invalidateTable(identifier);
+ if (identifier.isMainTable()) {
+ invalidateAttachedTables(identifier);
+ }
}
@Override
@@ -227,26 +228,23 @@ public class CachingCatalog extends DelegateCatalog {
return table;
}
- if (isSpecifiedSystemTable(identifier)) {
+ // For system table, do not cache it directly. Instead, cache the
origin table and then wrap
+ // it to generate the system table.
+ if (identifier.isSystemTable()) {
Identifier originIdentifier =
new Identifier(
identifier.getDatabaseName(),
identifier.getTableName(),
identifier.getBranchName(),
null);
- Table originTable = tableCache.getIfPresent(originIdentifier);
- if (originTable == null) {
- originTable = wrapped.getTable(originIdentifier);
- putTableCache(originIdentifier, originTable);
- }
+ Table originTable = getTable(originIdentifier);
table =
SystemTableLoader.load(
-
Preconditions.checkNotNull(identifier.getSystemTableName()),
+ checkNotNull(identifier.getSystemTableName()),
(FileStoreTable) originTable);
if (table == null) {
throw new TableNotExistException(identifier);
}
- putTableCache(identifier, table);
return table;
}
@@ -309,7 +307,7 @@ public class CachingCatalog extends DelegateCatalog {
public void onRemoval(Identifier identifier, Table table, @NonNull
RemovalCause cause) {
LOG.debug("Evicted {} from the table cache ({})", identifier,
cause);
if (RemovalCause.EXPIRED.equals(cause)) {
- tryInvalidateSysTables(identifier);
+ // ignore now
}
}
}
@@ -317,24 +315,18 @@ public class CachingCatalog extends DelegateCatalog {
@Override
public void invalidateTable(Identifier identifier) {
tableCache.invalidate(identifier);
- tryInvalidateSysTables(identifier);
if (partitionCache != null) {
partitionCache.invalidate(identifier);
}
}
- private void tryInvalidateSysTables(Identifier identifier) {
- if (!isSpecifiedSystemTable(identifier)) {
- tableCache.invalidateAll(allSystemTables(identifier));
- }
- }
-
- private static Iterable<Identifier> allSystemTables(Identifier ident) {
- List<Identifier> tables = new ArrayList<>();
- for (String type : SYSTEM_TABLES) {
- tables.add(Identifier.fromString(ident.getFullName() +
SYSTEM_TABLE_SPLITTER + type));
+ /** invalidate attached tables, such as cached branches. */
+ private void invalidateAttachedTables(Identifier identifier) {
+ for (@NonNull Identifier i : tableCache.asMap().keySet()) {
+ if (identifier.getTableName().equals(i.getTableName())) {
+ tableCache.invalidate(i);
+ }
}
- return tables;
}
// ================================== refresh
================================================
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
index 72da69b67b..6cca6824e3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
@@ -65,6 +65,10 @@ public class Identifier implements Serializable {
this.object = object;
}
+ public Identifier(String database, String table, @Nullable String branch) {
+ this(database, table, branch, null);
+ }
+
public Identifier(
String database, String table, @Nullable String branch, @Nullable
String systemTable) {
this.database = database;
@@ -119,6 +123,18 @@ public class Identifier implements Serializable {
return systemTable;
}
+ public boolean isMainTable() {
+ return getBranchName() == null && getSystemTableName() == null;
+ }
+
+ public boolean isBranch() {
+ return getBranchName() != null && getSystemTableName() == null;
+ }
+
+ public boolean isSystemTable() {
+ return getSystemTableName() != null;
+ }
+
private void splitObjectName() {
if (table != null) {
return;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index 7567f682ae..4792e33c93 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -48,10 +48,8 @@ import org.junit.jupiter.api.Test;
import java.io.FileNotFoundException;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -101,14 +99,49 @@ class CachingCatalogTest extends CatalogTestBase {
@Test
public void testInvalidateSysTablesIfBaseTableIsDropped() throws Exception
{
- Catalog catalog = new CachingCatalog(this.catalog);
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL,
ticker);
Identifier tableIdent = new Identifier("db", "tbl");
catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA,
false);
Identifier sysIdent = new Identifier("db", "tbl$files");
+ // get system table will only cache the origin table
catalog.getTable(sysIdent);
+ assertThat(catalog.tableCache.asMap()).containsKey(tableIdent);
+ assertThat(catalog.tableCache.asMap()).doesNotContainKey(sysIdent);
+ // test case sensitivity
+ Identifier sysIdent1 = new Identifier("db", "tbl$SNAPSHOTS");
+ catalog.getTable(sysIdent1);
+ assertThat(catalog.tableCache.asMap()).doesNotContainKey(sysIdent1);
+
catalog.dropTable(tableIdent, false);
+ assertThat(catalog.tableCache.asMap()).doesNotContainKey(tableIdent);
assertThatThrownBy(() -> catalog.getTable(sysIdent))
.hasMessage("Table db.tbl does not exist.");
+ assertThatThrownBy(() -> catalog.getTable(sysIdent1))
+ .hasMessage("Table db.tbl does not exist.");
+ }
+
+ @Test
+ public void testInvalidateBranchIfBaseTableIsDropped() throws Exception {
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL,
ticker);
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA,
false);
+ catalog.getTable(tableIdent).createBranch("b1");
+
+ Identifier branchIdent = new Identifier("db", "tbl$branch_b1");
+ Identifier branchSysIdent = new Identifier("db",
"tbl$branch_b1$FILES");
+ // get system table will only cache the origin table
+ catalog.getTable(branchSysIdent);
+ assertThat(catalog.tableCache.asMap()).containsKey(branchIdent);
+
assertThat(catalog.tableCache.asMap()).doesNotContainKey(branchSysIdent);
+
+ catalog.dropTable(tableIdent, false);
+ assertThat(catalog.tableCache.asMap()).doesNotContainKey(branchIdent);
+ assertThatThrownBy(() -> catalog.getTable(branchIdent))
+ .hasMessage("Table db.tbl$branch_b1 does not exist.");
+ assertThatThrownBy(() -> catalog.getTable(branchSysIdent))
+ .hasMessage("Table db.tbl$branch_b1 does not exist.");
}
@Test
@@ -175,59 +208,6 @@ class CachingCatalogTest extends CatalogTestBase {
assertThat(catalog.remainingAgeFor(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
}
- @Test
- public void testCacheExpirationEagerlyRemovesSysTables() throws Exception {
- TestableCachingCatalog catalog =
- new TestableCachingCatalog(this.catalog, EXPIRATION_TTL,
ticker);
-
- Identifier tableIdent = new Identifier("db", "tbl");
- catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
- Table table = catalog.getTable(tableIdent);
- assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
- assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO);
-
- ticker.advance(HALF_OF_EXPIRATION);
- assertThat(catalog.tableCache().asMap()).containsKey(tableIdent);
-
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
-
- for (Identifier sysTable : sysTables(tableIdent)) {
- catalog.getTable(sysTable);
- }
-
assertThat(catalog.tableCache().asMap()).containsKeys(sysTables(tableIdent));
- assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
- .isNotEmpty()
- .allMatch(age -> age.isPresent() &&
age.get().equals(Duration.ZERO));
-
- assertThat(catalog.remainingAgeFor(tableIdent))
- .as("Loading a non-cached sys table should refresh the main
table's age")
- .isEqualTo(Optional.of(EXPIRATION_TTL));
-
- // Move time forward and access already cached sys tables.
- ticker.advance(HALF_OF_EXPIRATION);
- for (Identifier sysTable : sysTables(tableIdent)) {
- catalog.getTable(sysTable);
- }
- assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
- .isNotEmpty()
- .allMatch(age -> age.isPresent() &&
age.get().equals(Duration.ZERO));
-
- assertThat(catalog.remainingAgeFor(tableIdent))
- .as("Accessing a cached sys table should not affect the main
table's age")
- .isEqualTo(Optional.of(HALF_OF_EXPIRATION));
-
- // Move time forward so the data table drops.
- ticker.advance(HALF_OF_EXPIRATION);
- assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent);
-
- Arrays.stream(sysTables(tableIdent))
- .forEach(
- sysTable ->
- assertThat(catalog.tableCache().asMap())
- .as(
- "When a data table expires,
its sys tables should expire regardless of age")
- .doesNotContainKeys(sysTable));
- }
-
@Test
public void testPartitionCache() throws Exception {
TestableCachingCatalog catalog =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
index c95fd62bee..56c6490286 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java
@@ -49,7 +49,10 @@ public class DeleteBranchProcedure extends ProcedureBase {
})
public String[] call(ProcedureContext procedureContext, String tableId,
String branchStr)
throws Catalog.TableNotExistException {
-
catalog.getTable(Identifier.fromString(tableId)).deleteBranches(branchStr);
+ Identifier identifier = Identifier.fromString(tableId);
+ catalog.getTable(identifier).deleteBranches(branchStr);
+ catalog.invalidateTable(
+ new Identifier(identifier.getDatabaseName(),
identifier.getTableName(), branchStr));
return new String[] {"Success"};
}
}
diff --git
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala
index a3cecfc72e..cb449edb4c 100644
---
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala
@@ -18,10 +18,15 @@
package org.apache.paimon.spark
+import org.apache.spark.SparkConf
import org.junit.jupiter.api.Assertions
class DataFrameWriteTest extends PaimonSparkTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
+ }
+
test("Paimon: DataFrameWrite.saveAsTable") {
import testImplicits._
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
index 18fb9e116b..ab4a9bcd9d 100644
---
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark
+import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{col, mean, window}
@@ -27,6 +28,10 @@ import java.sql.Date
class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
+ }
+
import testImplicits._
test("Paimon Sink: forEachBatch") {
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index 9957f0cdf9..63d75a53ef 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -186,7 +186,7 @@ public class SparkGenericCatalog extends SparkBaseCatalog
implements CatalogExte
@Override
public void invalidateTable(Identifier ident) {
// We do not need to check whether the table exists and whether
- // it is an Paimon table to reduce remote service requests.
+ // it is a Paimon table to reduce remote service requests.
sparkCatalog.invalidateTable(ident);
asTableCatalog().invalidateTable(ident);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
index e398eee026..4a01c33d6a 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java
@@ -18,6 +18,8 @@
package org.apache.paimon.spark.procedure;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -61,13 +63,20 @@ public class DeleteBranchProcedure extends BaseProcedure {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
String branchStr = args.getString(1);
- return modifyPaimonTable(
- tableIdent,
- table -> {
- table.deleteBranches(branchStr);
- InternalRow outputRow = newInternalRow(true);
- return new InternalRow[] {outputRow};
- });
+ InternalRow[] result =
+ modifyPaimonTable(
+ tableIdent,
+ table -> {
+ table.deleteBranches(branchStr);
+ InternalRow outputRow = newInternalRow(true);
+ return new InternalRow[] {outputRow};
+ });
+ ((WithPaimonCatalog) tableCatalog())
+ .paimonCatalog()
+ .invalidateTable(
+ new org.apache.paimon.catalog.Identifier(
+ tableIdent.namespace()[0], tableIdent.name(),
branchStr));
+ return result;
}
public static ProcedureBuilder builder() {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
index 63203122ac..61bf552494 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark
+import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{col, mean, window}
@@ -27,6 +28,10 @@ import java.sql.Date
class PaimonSinkTest extends PaimonSparkTestBase with StreamTest {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
+ }
+
import testImplicits._
test("Paimon Sink: forEachBatch") {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index 605b2e6ca5..867b3e5e33 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -66,7 +66,6 @@ class PaimonSparkTestBase
super.sparkConf
.set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName)
.set("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath)
- .set("spark.sql.catalog.paimon.cache-enabled", "false")
.set("spark.sql.extensions",
classOf[PaimonSparkSessionExtensions].getName)
.set("spark.serializer", serializer)
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
index a0a94afacf..edd092c85c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DecimalType
import org.junit.jupiter.api.Assertions
@@ -27,6 +28,11 @@ import org.junit.jupiter.api.Assertions
import java.sql.{Date, Timestamp}
class DataFrameWriteTest extends PaimonSparkTestBase {
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
+ }
+
import testImplicits._
test("Paimon: DataFrameWrite.saveAsTable") {