This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 99a191027 [spark] override equals as a preparative step for
performance optimization (#2411)
99a191027 is described below
commit 99a1910279dbdb1b18f07a4726d6a3c64f0e7a90
Author: Yann Byron <[email protected]>
AuthorDate: Wed Nov 29 10:14:13 2023 +0800
[spark] override equals as a preparative step for performance optimization
(#2411)
---
.../apache/paimon/spark/SparkInputPartition.java | 14 +++++++++++
.../apache/paimon/spark/SparkReaderFactory.java | 14 +++++++++++
.../java/org/apache/paimon/spark/SparkScan.java | 16 +------------
.../org/apache/paimon/spark/PaimonBatch.scala} | 28 ++++++++++++----------
4 files changed, 44 insertions(+), 28 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartition.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartition.java
index 5c22a26e0..47f32ab42 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartition.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartition.java
@@ -36,4 +36,18 @@ public class SparkInputPartition implements InputPartition {
public Split split() {
return split;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SparkInputPartition that = (SparkInputPartition) o;
+ return this.split.equals(that.split);
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
index 279aa5853..986183832 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java
@@ -61,4 +61,18 @@ public class SparkReaderFactory implements
PartitionReaderFactory {
SparkInternalRow row = new SparkInternalRow(readBuilder.readType());
return new SparkInputPartitionReader(ioManager, iterator, row);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SparkReaderFactory that = (SparkReaderFactory) o;
+ return this.readBuilder.equals(that.readBuilder);
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
index 04d959da6..891420beb 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScan.java
@@ -25,8 +25,6 @@ import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.spark.sql.connector.read.Batch;
-import org.apache.spark.sql.connector.read.InputPartition;
-import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
@@ -66,19 +64,7 @@ public class SparkScan implements Scan,
SupportsReportStatistics {
@Override
public Batch toBatch() {
- return new Batch() {
- @Override
- public InputPartition[] planInputPartitions() {
- return splits().stream()
- .map(SparkInputPartition::new)
- .toArray(InputPartition[]::new);
- }
-
- @Override
- public PartitionReaderFactory createReaderFactory() {
- return new SparkReaderFactory(readBuilder);
- }
- };
+ return new PaimonBatch(splits().toArray(new Split[0]), readBuilder);
}
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartition.java
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala
similarity index 51%
copy from
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartition.java
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala
index 5c22a26e0..b356cf90e 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartition.java
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala
@@ -15,25 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.paimon.spark
-package org.apache.paimon.spark;
+import org.apache.paimon.table.source.{ReadBuilder, Split}
-import org.apache.paimon.table.source.Split;
+import org.apache.spark.sql.connector.read.{Batch, InputPartition,
PartitionReaderFactory}
-import org.apache.spark.sql.connector.read.InputPartition;
+/** A Spark {@link Batch} for paimon. */
+case class PaimonBatch(splits: Array[Split], readBuilder: ReadBuilder) extends
Batch {
-/** A Spark {@link InputPartition} for paimon. */
-public class SparkInputPartition implements InputPartition {
+ override def planInputPartitions(): Array[InputPartition] =
+ splits.map(new SparkInputPartition(_).asInstanceOf[InputPartition])
- private static final long serialVersionUID = 1L;
+ override def createReaderFactory(): PartitionReaderFactory = new
SparkReaderFactory(readBuilder)
- private final Split split;
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case other: PaimonBatch =>
+ this.splits.sameElements(other.splits) &&
+ readBuilder.equals(other.readBuilder)
- public SparkInputPartition(Split split) {
- this.split = split;
- }
-
- public Split split() {
- return split;
+ case _ => false
}
+ }
}