This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d0fc63ca4d [spark] Support spark blob basic read write (#6368)
d0fc63ca4d is described below
commit d0fc63ca4d282c9e671f32449dc96045544e62db
Author: YeJunHao <[email protected]>
AuthorDate: Thu Oct 9 17:56:55 2025 +0800
[spark] Support spark blob basic read write (#6368)
---
.../java/org/apache/paimon/spark/SparkCatalog.java | 14 +++++++++++-
.../org/apache/paimon/spark/SparkTypeUtils.java | 6 ++++++
.../paimon/spark/data/SparkInternalRow.scala | 22 +++++++++++++++++--
.../apache/spark/sql/paimon/shims/SparkShim.scala | 2 ++
.../apache/paimon/spark/sql/BlobTestBase.scala} | 25 ++++++++++++----------
.../spark/data/Spark3InternalRowWithBlob.scala} | 20 ++++++++---------
.../apache/spark/sql/paimon/shims/Spark3Shim.scala | 8 ++++++-
.../spark/data/Spark4InternalRowWithBlob.scala} | 20 ++++++++---------
.../apache/spark/sql/paimon/shims/Spark4Shim.scala | 8 ++++++-
9 files changed, 88 insertions(+), 37 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index a7bda93290..d388cb51bf 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -39,6 +39,7 @@ import
org.apache.paimon.spark.catalog.functions.PaimonFunctions;
import org.apache.paimon.spark.catalog.functions.V1FunctionConverter;
import org.apache.paimon.spark.utils.CatalogUtils;
import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.ExceptionUtils;
@@ -92,6 +93,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
@@ -109,6 +111,7 @@ import static
org.apache.paimon.spark.utils.CatalogUtils.isUpdateColumnDefaultVa
import static org.apache.paimon.spark.utils.CatalogUtils.removeCatalogName;
import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
import static
org.apache.paimon.spark.utils.CatalogUtils.toUpdateColumnDefaultValue;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Spark {@link TableCatalog} for paimon. */
public class SparkCatalog extends SparkBaseCatalog
@@ -456,6 +459,7 @@ public class SparkCatalog extends SparkBaseCatalog
private Schema toInitialSchema(
StructType schema, Transform[] partitions, Map<String, String>
properties) {
Map<String, String> normalizedProperties = new HashMap<>(properties);
+ String blobFieldName = properties.get(CoreOptions.BLOB_FIELD.key());
String provider = properties.get(TableCatalog.PROP_PROVIDER);
if (!usePaimon(provider)) {
if (isFormatTable(provider)) {
@@ -488,7 +492,15 @@ public class SparkCatalog extends SparkBaseCatalog
for (StructField field : schema.fields()) {
String name = field.name();
- DataType type =
toPaimonType(field.dataType()).copy(field.nullable());
+ DataType type;
+ if (Objects.equals(blobFieldName, name)) {
+ checkArgument(
+ field.dataType() instanceof
org.apache.spark.sql.types.BinaryType,
+ "The type of blob field must be binary");
+ type = new BlobType();
+ } else {
+ type = toPaimonType(field.dataType()).copy(field.nullable());
+ }
String comment = field.getComment().getOrElse(() -> null);
if
(field.metadata().contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
String defaultValue =
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
index de14ef4316..dc2f8b30ac 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
@@ -23,6 +23,7 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
@@ -161,6 +162,11 @@ public class SparkTypeUtils {
return DataTypes.BinaryType;
}
+ @Override
+ public DataType visit(BlobType blobType) {
+ return DataTypes.BinaryType;
+ }
+
@Override
public DataType visit(VarBinaryType varBinaryType) {
return DataTypes.BinaryType;
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
index b0916447c0..b3ac41598e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
@@ -18,11 +18,13 @@
package org.apache.paimon.spark.data
-import org.apache.paimon.types.RowType
+import org.apache.paimon.types.{DataTypeRoot, RowType}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import java.util.OptionalInt
+
abstract class SparkInternalRow extends InternalRow {
def replace(row: org.apache.paimon.data.InternalRow): SparkInternalRow
}
@@ -30,7 +32,23 @@ abstract class SparkInternalRow extends InternalRow {
object SparkInternalRow {
def create(rowType: RowType): SparkInternalRow = {
- SparkShimLoader.shim.createSparkInternalRow(rowType)
+ val fieldIndex = blobFieldIndex(rowType)
+ if (fieldIndex.isPresent) {
+ SparkShimLoader.shim.createSparkInternalRowWithBlob(rowType,
fieldIndex.getAsInt)
+ } else {
+ SparkShimLoader.shim.createSparkInternalRow(rowType)
+ }
+ }
+
+ private def blobFieldIndex(rowType: RowType): OptionalInt = {
+ var i: Int = 0
+ while (i < rowType.getFieldCount) {
+ if (rowType.getTypeAt(i).getTypeRoot.equals(DataTypeRoot.BLOB)) {
+ return OptionalInt.of(i)
+ }
+ i += 1
+ }
+ OptionalInt.empty()
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
index eb34fac022..bb89ed7649 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
@@ -51,6 +51,8 @@ trait SparkShim {
def createSparkInternalRow(rowType: RowType): SparkInternalRow
+ def createSparkInternalRowWithBlob(rowType: RowType, blobFieldIndex: Int):
SparkInternalRow
+
def createSparkArrayData(elementType: DataType): SparkArrayData
def createTable(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
similarity index 56%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
copy to
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index b0916447c0..fa175f9e2b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -16,21 +16,24 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.data
+package org.apache.paimon.spark.sql
-import org.apache.paimon.types.RowType
+import org.apache.paimon.spark.PaimonSparkTestBase
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.Row
-abstract class SparkInternalRow extends InternalRow {
- def replace(row: org.apache.paimon.data.InternalRow): SparkInternalRow
-}
-
-object SparkInternalRow {
+class BlobTestBase extends PaimonSparkTestBase {
+ test("Blob: test basic") {
+ withTable("t") {
+ sql(
+ "CREATE TABLE t (id INT, data STRING, picture BINARY) TBLPROPERTIES
('row-tracking.enabled'='true', 'data-evolution.enabled'='true',
'blob-field'='picture')")
+ sql("INSERT INTO t VALUES (1, 'paimon', X'48656C6C6F')")
- def create(rowType: RowType): SparkInternalRow = {
- SparkShimLoader.shim.createSparkInternalRow(rowType)
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"),
+ Seq(Row(1, "paimon", Array[Byte](72, 101, 108, 108, 111), 0, 1))
+ )
+ }
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala
similarity index 70%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
copy to
paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala
index b0916447c0..5922239ec2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
+++
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/data/Spark3InternalRowWithBlob.scala
@@ -18,19 +18,17 @@
package org.apache.paimon.spark.data
+import org.apache.paimon.spark.AbstractSparkInternalRow
import org.apache.paimon.types.RowType
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.paimon.shims.SparkShimLoader
+class Spark3InternalRowWithBlob(rowType: RowType, blobFieldIndex: Int)
+ extends Spark3InternalRow(rowType) {
-abstract class SparkInternalRow extends InternalRow {
- def replace(row: org.apache.paimon.data.InternalRow): SparkInternalRow
-}
-
-object SparkInternalRow {
-
- def create(rowType: RowType): SparkInternalRow = {
- SparkShimLoader.shim.createSparkInternalRow(rowType)
+ override def getBinary(ordinal: Int): Array[Byte] = {
+ if (ordinal == blobFieldIndex) {
+ row.getBlob(ordinal).toData
+ } else {
+ super.getBinary(ordinal)
+ }
}
-
}
diff --git
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
index 74803626d6..276e22bdb9 100644
---
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
+++
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql.paimon.shims
import org.apache.paimon.data.variant.Variant
import org.apache.paimon.spark.catalyst.analysis.Spark3ResolutionRules
import
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark3SqlExtensionsParser
-import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow,
SparkArrayData, SparkInternalRow}
+import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow,
Spark3InternalRowWithBlob, SparkArrayData, SparkInternalRow}
import org.apache.paimon.types.{DataType, RowType}
import org.apache.spark.sql.SparkSession
@@ -54,6 +54,12 @@ class Spark3Shim extends SparkShim {
new Spark3InternalRow(rowType)
}
+ override def createSparkInternalRowWithBlob(
+ rowType: RowType,
+ blobFieldIndex: Int): SparkInternalRow = {
+ new Spark3InternalRowWithBlob(rowType, blobFieldIndex)
+ }
+
override def createSparkArrayData(elementType: DataType): SparkArrayData = {
new Spark3ArrayData(elementType)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
similarity index 69%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
copy to
paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
index b0916447c0..836fe46b37 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/data/SparkInternalRow.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/data/Spark4InternalRowWithBlob.scala
@@ -18,19 +18,19 @@
package org.apache.paimon.spark.data
+import org.apache.paimon.spark.AbstractSparkInternalRow
import org.apache.paimon.types.RowType
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.unsafe.types.VariantVal
-abstract class SparkInternalRow extends InternalRow {
- def replace(row: org.apache.paimon.data.InternalRow): SparkInternalRow
-}
-
-object SparkInternalRow {
+class Spark4InternalRowWithBlob(rowType: RowType, blobFieldIndex: Int)
+ extends Spark4InternalRow(rowType) {
- def create(rowType: RowType): SparkInternalRow = {
- SparkShimLoader.shim.createSparkInternalRow(rowType)
+ override def getBinary(ordinal: Int): Array[Byte] = {
+ if (ordinal == blobFieldIndex) {
+ row.getBlob(ordinal).toData
+ } else {
+ super.getBinary(ordinal)
+ }
}
-
}
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index 2ae52f7118..53f2b0b199 100644
---
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql.paimon.shims
import org.apache.paimon.data.variant.{GenericVariant, Variant}
import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules
import
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser
-import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow,
SparkArrayData, SparkInternalRow}
+import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow,
Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow}
import org.apache.paimon.types.{DataType, RowType}
import org.apache.spark.sql.SparkSession
@@ -55,6 +55,12 @@ class Spark4Shim extends SparkShim {
new Spark4InternalRow(rowType)
}
+ override def createSparkInternalRowWithBlob(
+ rowType: RowType,
+ blobFieldIndex: Int): SparkInternalRow = {
+ new Spark4InternalRowWithBlob(rowType, blobFieldIndex)
+ }
+
override def createSparkArrayData(elementType: DataType): SparkArrayData = {
new Spark4ArrayData(elementType)
}