This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 99a191027 [spark] override equals as a preparative step for 
performance optimization (#2411)
99a191027 is described below

commit 99a1910279dbdb1b18f07a4726d6a3c64f0e7a90
Author: Yann Byron <[email protected]>
AuthorDate: Wed Nov 29 10:14:13 2023 +0800

    [spark] override equals as a preparative step for performance optimization 
(#2411)
---
 .../apache/paimon/spark/SparkInputPartition.java   | 14 +++++++++++
 .../apache/paimon/spark/SparkReaderFactory.java    | 14 +++++++++++
 .../java/org/apache/paimon/spark/SparkScan.java    | 16 +------------
 .../org/apache/paimon/spark/PaimonBatch.scala}     | 28 ++++++++++++----------
 4 files changed, 44 insertions(+), 28 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartition.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartition.java
index 5c22a26e0..47f32ab42 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartition.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartition.java
@@ -36,4 +36,18 @@ public class SparkInputPartition implements InputPartition {
     public Split split() {
         return split;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        SparkInputPartition that = (SparkInputPartition) o;
+        return this.split.equals(that.split);
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
index 279aa5853..986183832 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
@@ -61,4 +61,18 @@ public class SparkReaderFactory implements 
PartitionReaderFactory {
         SparkInternalRow row = new SparkInternalRow(readBuilder.readType());
         return new SparkInputPartitionReader(ioManager, iterator, row);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        SparkReaderFactory that = (SparkReaderFactory) o;
+        return this.readBuilder.equals(that.readBuilder);
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
index 04d959da6..891420beb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
@@ -25,8 +25,6 @@ import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 
 import org.apache.spark.sql.connector.read.Batch;
-import org.apache.spark.sql.connector.read.InputPartition;
-import org.apache.spark.sql.connector.read.PartitionReaderFactory;
 import org.apache.spark.sql.connector.read.Scan;
 import org.apache.spark.sql.connector.read.Statistics;
 import org.apache.spark.sql.connector.read.SupportsReportStatistics;
@@ -66,19 +64,7 @@ public class SparkScan implements Scan, 
SupportsReportStatistics {
 
     @Override
     public Batch toBatch() {
-        return new Batch() {
-            @Override
-            public InputPartition[] planInputPartitions() {
-                return splits().stream()
-                        .map(SparkInputPartition::new)
-                        .toArray(InputPartition[]::new);
-            }
-
-            @Override
-            public PartitionReaderFactory createReaderFactory() {
-                return new SparkReaderFactory(readBuilder);
-            }
-        };
+        return new PaimonBatch(splits().toArray(new Split[0]), readBuilder);
     }
 
     @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartition.java
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala
similarity index 51%
copy from 
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartition.java
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala
index 5c22a26e0..b356cf90e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartition.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala
@@ -15,25 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.paimon.spark
 
-package org.apache.paimon.spark;
+import org.apache.paimon.table.source.{ReadBuilder, Split}
 
-import org.apache.paimon.table.source.Split;
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReaderFactory}
 
-import org.apache.spark.sql.connector.read.InputPartition;
+/** A Spark {@link Batch} for paimon. */
+case class PaimonBatch(splits: Array[Split], readBuilder: ReadBuilder) extends 
Batch {
 
-/** A Spark {@link InputPartition} for paimon. */
-public class SparkInputPartition implements InputPartition {
+  override def planInputPartitions(): Array[InputPartition] =
+    splits.map(new SparkInputPartition(_).asInstanceOf[InputPartition])
 
-    private static final long serialVersionUID = 1L;
+  override def createReaderFactory(): PartitionReaderFactory = new 
SparkReaderFactory(readBuilder)
 
-    private final Split split;
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case other: PaimonBatch =>
+        this.splits.sameElements(other.splits) &&
+        readBuilder.equals(other.readBuilder)
 
-    public SparkInputPartition(Split split) {
-        this.split = split;
-    }
-
-    public Split split() {
-        return split;
+      case _ => false
     }
+  }
 }

Reply via email to