This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4cb9520bb0 [spark] Reduce redundant loadTable (#6743)
4cb9520bb0 is described below
commit 4cb9520bb0991adb267ff57e3931241f42843e6f
Author: summaryzb <[email protected]>
AuthorDate: Sun May 24 10:19:02 2026 +0800
[spark] Reduce redundant loadTable (#6743)
Previously `modifyPaimonTable` and `createRelation` in `BaseProcedure`
both loadTable, this should be optimized by reduce redundant loadTable
action
---
.../paimon/spark/procedure/BaseProcedure.java | 23 +++++++++++-----------
.../paimon/spark/procedure/CompactProcedure.java | 7 ++++---
.../procedure/CreateGlobalIndexProcedure.java | 7 ++++---
.../paimon/spark/procedure/RescaleProcedure.java | 13 ++++++------
.../spark/procedure/RewriteFileIndexProcedure.java | 7 ++++---
5 files changed, 29 insertions(+), 28 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
index 2bf38c9fc7..c61f0c1fa5 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/BaseProcedure.java
@@ -76,22 +76,17 @@ abstract class BaseProcedure implements Procedure {
protected <T> T modifyPaimonTable(
Identifier ident, Function<org.apache.paimon.table.Table, T> func)
{
- return execute(ident, true, func);
- }
-
- private <T> T execute(
- Identifier ident,
- boolean refreshSparkCache,
- Function<org.apache.paimon.table.Table, T> func) {
SparkTable sparkTable = loadSparkTable(ident);
org.apache.paimon.table.Table table = sparkTable.getTable();
-
T result = func.apply(table);
+ refreshSparkCache(ident, sparkTable);
+ return result;
+ }
- if (refreshSparkCache) {
- refreshSparkCache(ident, sparkTable);
- }
-
+ protected <T> T modifySparkTable(Identifier ident, Function<SparkTable, T>
func) {
+ SparkTable sparkTable = loadSparkTable(ident);
+ T result = func.apply(sparkTable);
+ refreshSparkCache(ident, sparkTable);
return result;
}
@@ -114,6 +109,10 @@ abstract class BaseProcedure implements Procedure {
loadSparkTable(ident), Option.apply(tableCatalog),
Option.apply(ident));
}
+ protected DataSourceV2Relation createRelation(Identifier ident, Table
table) {
+ return DataSourceV2Relation.create(table, Option.apply(tableCatalog),
Option.apply(ident));
+ }
+
protected void refreshSparkCache(Identifier ident, Table table) {
DataSourceV2Relation relation =
DataSourceV2Relation.create(table, Option.apply(tableCatalog),
Option.apply(ident));
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 7f8c89dcba..274080cd7a 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -184,9 +184,10 @@ public class CompactProcedure extends BaseProcedure {
partitions == null || where == null,
"partitions and where cannot be used together.");
String finalWhere = partitions != null ?
SparkProcedureUtils.toWhere(partitions) : where;
- return modifyPaimonTable(
+ return modifySparkTable(
tableIdent,
- t -> {
+ sparkTable -> {
+ org.apache.paimon.table.Table t = sparkTable.getTable();
checkArgument(t instanceof FileStoreTable);
FileStoreTable table = (FileStoreTable) t;
CoreOptions coreOptions = table.coreOptions();
@@ -195,7 +196,7 @@ public class CompactProcedure extends BaseProcedure {
"order_by should not contain partition cols,
because it is meaningless, your order_by cols are %s, and partition cols are
%s",
sortColumns,
table.partitionKeys());
- DataSourceV2Relation relation = createRelation(tableIdent);
+ DataSourceV2Relation relation = createRelation(tableIdent,
sparkTable);
PartitionPredicate partitionPredicate =
SparkProcedureUtils.convertToPartitionPredicate(
finalWhere,
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
index ddc7477bf7..e25464b173 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
@@ -106,10 +106,11 @@ public class CreateGlobalIndexProcedure extends
BaseProcedure {
LOG.info("Starting to build index for table " + tableIdent + " WHERE:
" + finalWhere);
- return modifyPaimonTable(
+ return modifySparkTable(
tableIdent,
- t -> {
+ sparkTable -> {
try {
+ org.apache.paimon.table.Table t =
sparkTable.getTable();
checkArgument(
t instanceof FileStoreTable,
"Only FileStoreTable supports global index
creation.");
@@ -125,7 +126,7 @@ public class CreateGlobalIndexProcedure extends
BaseProcedure {
"Column '%s' does not exist in table '%s'.",
column,
tableIdent);
- DataSourceV2Relation relation =
createRelation(tableIdent);
+ DataSourceV2Relation relation =
createRelation(tableIdent, sparkTable);
PartitionPredicate partitionPredicate =
SparkProcedureUtils.convertToPartitionPredicate(
finalWhere,
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java
index 94e1619806..4a4c15ceff 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java
@@ -106,9 +106,10 @@ public class RescaleProcedure extends BaseProcedure {
"partitions and where cannot be used together.");
String finalWhere = partitions != null ?
SparkProcedureUtils.toWhere(partitions) : where;
- return modifyPaimonTable(
+ return modifySparkTable(
tableIdent,
- table -> {
+ sparkTable -> {
+ org.apache.paimon.table.Table table =
sparkTable.getTable();
checkArgument(table instanceof FileStoreTable);
FileStoreTable fileStoreTable = (FileStoreTable) table;
@@ -130,7 +131,7 @@ public class RescaleProcedure extends BaseProcedure {
String.valueOf(snapshot.id()));
fileStoreTable = fileStoreTable.copy(dynamicOptions);
- DataSourceV2Relation relation = createRelation(tableIdent);
+ DataSourceV2Relation relation = createRelation(tableIdent,
sparkTable);
PartitionPredicate partitionPredicate =
SparkProcedureUtils.convertToPartitionPredicate(
finalWhere,
@@ -144,7 +145,7 @@ public class RescaleProcedure extends BaseProcedure {
"When rescaling postpone bucket tables, you
must provide the resulting bucket number.");
}
- execute(fileStoreTable, bucketNum, partitionPredicate,
tableIdent);
+ execute(fileStoreTable, bucketNum, partitionPredicate,
relation);
InternalRow internalRow = newInternalRow(true);
return new InternalRow[] {internalRow};
@@ -155,9 +156,7 @@ public class RescaleProcedure extends BaseProcedure {
FileStoreTable table,
@Nullable Integer bucketNum,
PartitionPredicate partitionPredicate,
- Identifier tableIdent) {
- DataSourceV2Relation relation = createRelation(tableIdent);
-
+ DataSourceV2Relation relation) {
SnapshotReader snapshotReader = table.newSnapshotReader();
if (partitionPredicate != null) {
snapshotReader =
snapshotReader.withPartitionFilter(partitionPredicate);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RewriteFileIndexProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RewriteFileIndexProcedure.java
index d45bf57962..ea48a449fc 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RewriteFileIndexProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RewriteFileIndexProcedure.java
@@ -96,11 +96,12 @@ public class RewriteFileIndexProcedure extends
BaseProcedure {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
String where = args.isNullAt(1) ? null : args.getString(1);
- return modifyPaimonTable(
+ return modifySparkTable(
tableIdent,
- t -> {
+ sparkTable -> {
+ org.apache.paimon.table.Table t = sparkTable.getTable();
FileStoreTable table = (FileStoreTable) t;
- DataSourceV2Relation relation = createRelation(tableIdent);
+ DataSourceV2Relation relation = createRelation(tableIdent,
sparkTable);
PartitionPredicate partitionPredicate =
SparkProcedureUtils.convertToPartitionPredicate(
where,