This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 499ba1392 [core] FileStore should not be serializable (#3830)
499ba1392 is described below
commit 499ba1392f4e99c26b30a0802657feb46805c542
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jul 29 19:11:24 2024 +0800
[core] FileStore should not be serializable (#3830)
---
paimon-core/src/main/java/org/apache/paimon/FileStore.java | 3 +--
.../src/main/java/org/apache/paimon/KeyValueFileStore.java | 2 --
.../scala/org/apache/paimon/spark/commands/PaimonCommand.scala | 8 ++++----
.../org/apache/paimon/spark/commands/PaimonSparkWriter.scala | 4 +---
4 files changed, 6 insertions(+), 11 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index e943d38cf..dae508733 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -42,7 +42,6 @@ import org.apache.paimon.utils.TagManager;
import javax.annotation.Nullable;
-import java.io.Serializable;
import java.util.List;
/**
@@ -50,7 +49,7 @@ import java.util.List;
*
* @param <T> type of record to read and write.
*/
-public interface FileStore<T> extends Serializable {
+public interface FileStore<T> {
FileStorePathFactory pathFactory();
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index b1b7fc211..c17f1c252 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -61,8 +61,6 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** {@link FileStore} for querying and updating {@link KeyValue}s. */
public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
- private static final long serialVersionUID = 1L;
-
private final boolean crossPartitionUpdate;
private final RowType bucketKeyType;
private final RowType keyType;
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index d5dd1e19e..4a42e4f46 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.commands
-import org.apache.paimon.deletionvectors.{BitmapDeletionVector, DeletionVector}
+import org.apache.paimon.deletionvectors.BitmapDeletionVector
import org.apache.paimon.fs.Path
import org.apache.paimon.index.IndexFileMeta
import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement,
IndexIncrement}
@@ -159,8 +159,8 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper {
val metadataCols = Seq(FILE_PATH, ROW_INDEX)
val filteredRelation = createNewScanPlan(candidateDataSplits, condition,
relation, metadataCols)
- val store = table.store()
- val location = table.location
+ val my_table = table
+ val location = my_table.location
createDataset(sparkSession, filteredRelation)
.select(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
.as[(String, Long)]
@@ -174,7 +174,7 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper {
val relativeFilePath = location.toUri.relativize(new
URI(filePath)).toString
val (partition, bucket) =
dataFileToPartitionAndBucket.toMap.apply(relativeFilePath)
- val pathFactory = store.pathFactory()
+ val pathFactory = my_table.store().pathFactory()
val partitionAndBucket = pathFactory
.relativePartitionAndBucketPath(partition, bucket)
.toString
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 62fb4689d..a5681e54e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -19,7 +19,6 @@
package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions.WRITE_ONLY
-import org.apache.paimon.data.BinaryRow
import org.apache.paimon.deletionvectors.{DeletionVector,
DeletionVectorIndexFileMaintainer}
import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
@@ -207,12 +206,11 @@ case class PaimonSparkWriter(table: FileStoreTable) {
val sparkSession = deletionVectors.sparkSession
import sparkSession.implicits._
val snapshotId = table.snapshotManager().latestSnapshotId();
- val fileStore = table.store()
val serializedCommits = deletionVectors
.groupByKey(_.partitionAndBucket)
.mapGroups {
case (_, iter: Iterator[SparkDeletionVectors]) =>
- val indexHandler = fileStore.newIndexFileHandler()
+ val indexHandler = table.store().newIndexFileHandler()
var dvIndexFileMaintainer: DeletionVectorIndexFileMaintainer = null
while (iter.hasNext) {
val sdv: SparkDeletionVectors = iter.next()