This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6bb161b5d6 [core] support vector on spark (#8019)
6bb161b5d6 is described below
commit 6bb161b5d6f209e82438ebb2da2d21be08835f37
Author: Stefanietry <[email protected]>
AuthorDate: Mon Jun 1 17:49:24 2026 +0800
[core] support vector on spark (#8019)
---
.../java/org/apache/paimon/hive/HiveTypeUtils.java | 6 +
.../PaimonObjectInspectorFactory.java | 4 +
.../paimon/spark/AbstractSparkInternalRow.java | 9 +-
.../org/apache/paimon/spark/DataConverter.java | 14 +++
.../java/org/apache/paimon/spark/SparkCatalog.java | 20 ++++
.../paimon/spark/SparkInternalRowWrapper.java | 24 +++-
.../java/org/apache/paimon/spark/SparkRow.java | 96 ++++++++++++++-
.../org/apache/paimon/spark/SparkTypeUtils.java | 14 ++-
.../apache/paimon/spark/SparkMultimodalITCase.java | 133 +++++++++++++++++++++
.../org/apache/paimon/spark/SparkTypeTest.java | 19 +++
10 files changed, 333 insertions(+), 6 deletions(-)
diff --git
a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
index e4799341d1..23c01d1144 100644
---
a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
+++
b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
@@ -45,6 +45,7 @@ import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;
+import org.apache.paimon.types.VectorType;
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveVarchar;
@@ -235,6 +236,11 @@ public class HiveTypeUtils {
return TypeInfoFactory.binaryTypeInfo;
}
+ @Override
+ public TypeInfo visit(VectorType vectorType) {
+ return
TypeInfoFactory.getListTypeInfo(vectorType.getElementType().accept(this));
+ }
+
@Override
protected TypeInfo defaultMethod(org.apache.paimon.types.DataType
dataType) {
throw new UnsupportedOperationException("Unsupported type: " +
dataType);
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
index 9e9eb0f31f..9ad6a1a99b 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
@@ -28,6 +28,7 @@ import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimeType;
import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.types.VectorType;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
@@ -81,6 +82,9 @@ public class PaimonObjectInspectorFactory {
case ARRAY:
ArrayType arrayType = (ArrayType) logicalType;
return new
PaimonListObjectInspector(arrayType.getElementType());
+ case VECTOR:
+ VectorType vectorType = (VectorType) logicalType;
+ return new
PaimonListObjectInspector(vectorType.getElementType());
case MAP:
MapType mapType = (MapType) logicalType;
return new PaimonMapObjectInspector(mapType.getKeyType(),
mapType.getValueType());
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java
index 283077430e..5910c60744 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java
@@ -25,6 +25,7 @@ import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VectorType;
import org.apache.paimon.utils.InternalRowUtils;
import org.apache.spark.sql.catalyst.util.ArrayData;
@@ -172,7 +173,13 @@ public abstract class AbstractSparkInternalRow extends
SparkInternalRow {
@Override
public ArrayData getArray(int ordinal) {
- return fromPaimon(row.getArray(ordinal), (ArrayType)
rowType.getTypeAt(ordinal));
+ DataType type = rowType.getTypeAt(ordinal);
+ if (type instanceof ArrayType) {
+ return fromPaimon(row.getArray(ordinal), (ArrayType) type);
+ } else if (type instanceof VectorType) {
+ return DataConverter.fromPaimon(row.getVector(ordinal),
(VectorType) type);
+ }
+ throw new UnsupportedOperationException("Not an array type: " + type);
}
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java
index 0b5ea89947..5c8026f461 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.spark.data.SparkArrayData;
import org.apache.paimon.spark.data.SparkInternalRow;
@@ -32,6 +33,7 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VectorType;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
import org.apache.spark.sql.catalyst.util.ArrayData;
@@ -58,6 +60,8 @@ public class DataConverter {
return fromPaimon((org.apache.paimon.data.Decimal) o);
case ARRAY:
return fromPaimon((InternalArray) o, (ArrayType) type);
+ case VECTOR:
+ return fromPaimon((InternalVector) o, (VectorType) type);
case MAP:
case MULTISET:
return fromPaimon((InternalMap) o, type);
@@ -93,6 +97,16 @@ public class DataConverter {
return fromPaimonArrayElementType(array, arrayType.getElementType());
}
+ public static ArrayData fromPaimon(InternalVector vector, VectorType
vectorType) {
+ if (vector.size() != vectorType.getLength()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Vector length mismatch. Expected %d but was %d.",
+ vectorType.getLength(), vector.size()));
+ }
+ return fromPaimonArrayElementType(vector, vectorType.getElementType());
+ }
+
private static ArrayData fromPaimonArrayElementType(InternalArray array,
DataType elementType) {
return SparkArrayData.create(elementType).replace(array);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index c6d35f6d81..b299248a81 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -44,7 +44,9 @@ import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.ExceptionUtils;
+import org.apache.paimon.utils.Preconditions;
import org.apache.spark.sql.PaimonSparkSession$;
import org.apache.spark.sql.SparkSession;
@@ -73,6 +75,7 @@ import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.execution.datasources.DataSource;
import org.apache.spark.sql.execution.datasources.FileFormat;
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2;
+import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -563,6 +566,7 @@ public class SparkCatalog extends SparkBaseCatalog
List<String> blobFields = CoreOptions.blobField(properties);
Set<String> blobDescriptorFields = new
CoreOptions(properties).blobDescriptorField();
List<String> blobViewFields = CoreOptions.blobViewField(properties);
+ Set<String> vectorFields =
CoreOptions.fromMap(properties).vectorField();
String provider = properties.get(TableCatalog.PROP_PROVIDER);
if (!usePaimon(provider)) {
if (isFormatTable(provider)) {
@@ -609,6 +613,22 @@ public class SparkCatalog extends SparkBaseCatalog
field.dataType() instanceof
org.apache.spark.sql.types.BinaryType,
"The type of blob field must be binary");
type = new BlobType();
+ } else if (vectorFields.contains(field.name())) {
+ Preconditions.checkArgument(
+ field.dataType() instanceof ArrayType,
+ "The type of blob field must be array");
+ ArrayType arrayType = (ArrayType) field.dataType();
+ String dimKey = String.format("field.%s.vector-dim",
field.name());
+ Preconditions.checkArgument(
+ properties.containsKey(dimKey),
+ "When setting '"
+ + CoreOptions.VECTOR_FIELD.key()
+ + "', you must also set 'field.%s.vector-dim',"
+ + " where %s is the name of the vector
field.");
+ type =
+ DataTypes.VECTOR(
+ Integer.parseInt(properties.get(dimKey)),
+ toPaimonType(arrayType.elementType()));
} else {
type = toPaimonType(field.dataType()).copy(field.nullable());
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
index 45a7c0af41..97b5771594 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
@@ -259,7 +259,20 @@ public class SparkInternalRowWrapper implements
InternalRow, Serializable {
@Override
public InternalVector getVector(int pos) {
- throw new UnsupportedOperationException("Not support VectorType yet.");
+ int actualPos = getActualFieldPosition(pos);
+ if (actualPos == -1 || internalRow.isNullAt(actualPos)) {
+ return null;
+ }
+ DataType dataType = tableSchema.fields()[pos].dataType();
+ return toSparkInternalVector(dataType,
internalRow.getArray(actualPos));
+ }
+
+ private static InternalVector toSparkInternalVector(DataType dataType,
ArrayData arrayData) {
+ if (!(dataType instanceof ArrayType)) {
+ throw new UnsupportedOperationException("Not a vector type: " +
dataType);
+ }
+ ArrayType arrayType = (ArrayType) dataType;
+ return new SparkInternalVector(arrayData, arrayType.elementType());
}
@Override
@@ -435,7 +448,7 @@ public class SparkInternalRowWrapper implements
InternalRow, Serializable {
@Override
public InternalVector getVector(int pos) {
- throw new UnsupportedOperationException("Not support VectorType
yet.");
+ return toSparkInternalVector(elementType, arrayData.getArray(pos));
}
@Override
@@ -452,6 +465,13 @@ public class SparkInternalRowWrapper implements
InternalRow, Serializable {
}
}
+ /** adapt to spark internal vector. */
+ public static class SparkInternalVector extends SparkInternalArray
implements InternalVector {
+ public SparkInternalVector(ArrayData arrayData, DataType elementType) {
+ super(arrayData, elementType);
+ }
+ }
+
/** adapt to spark internal map. */
public static class SparkInternalMap implements InternalMap {
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
index 84767db9ab..6dea5a5d29 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
@@ -20,6 +20,7 @@ package org.apache.paimon.spark;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
@@ -35,6 +36,7 @@ import org.apache.paimon.types.DateType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VectorType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.UriReaderFactory;
@@ -48,6 +50,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -168,7 +171,10 @@ public class SparkRow implements InternalRow, Serializable
{
@Override
public InternalVector getVector(int pos) {
- throw new UnsupportedOperationException("Not support VectorType yet.");
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return toPaimonVector((VectorType) type.getTypeAt(pos), row.get(pos));
}
@Override
@@ -426,4 +432,92 @@ public class SparkRow implements InternalRow, Serializable
{
return res;
}
}
+
+ private static InternalVector toPaimonVector(VectorType vectorType, Object
vector) {
+ if (vector == null) {
+ return null;
+ }
+ if (vector instanceof boolean[]) {
+ return BinaryVector.fromPrimitiveArray((boolean[]) vector);
+ } else if (vector instanceof byte[]) {
+ return BinaryVector.fromPrimitiveArray((byte[]) vector);
+ } else if (vector instanceof short[]) {
+ return BinaryVector.fromPrimitiveArray((short[]) vector);
+ } else if (vector instanceof int[]) {
+ return BinaryVector.fromPrimitiveArray((int[]) vector);
+ } else if (vector instanceof long[]) {
+ return BinaryVector.fromPrimitiveArray((long[]) vector);
+ } else if (vector instanceof float[]) {
+ return BinaryVector.fromPrimitiveArray((float[]) vector);
+ } else if (vector instanceof double[]) {
+ return BinaryVector.fromPrimitiveArray((double[]) vector);
+ }
+ if (vector instanceof scala.collection.Seq) {
+ vector =
JavaConverters.seqAsJavaList((scala.collection.Seq<Object>) vector);
+ } else if (vector.getClass().isArray()) {
+ vector = Arrays.asList((Object[]) vector);
+ }
+ if (!(vector instanceof List)) {
+ throw new UnsupportedOperationException(
+ "Unsupported vector object: " +
vector.getClass().getName());
+ }
+ return toPaimonVector(vectorType, (List<?>) vector);
+ }
+
+ private static InternalVector toPaimonVector(VectorType vectorType,
List<?> list) {
+ int expectedLength = vectorType.getLength();
+ if (list.size() != expectedLength) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Vector length mismatch. Expected %d but was %d.",
+ expectedLength, list.size()));
+ }
+ switch (vectorType.getElementType().getTypeRoot()) {
+ case BOOLEAN:
+ boolean[] booleanValues = new boolean[expectedLength];
+ for (int i = 0; i < expectedLength; i++) {
+ booleanValues[i] = (Boolean) list.get(i);
+ }
+ return BinaryVector.fromPrimitiveArray(booleanValues);
+ case TINYINT:
+ byte[] byteValues = new byte[expectedLength];
+ for (int i = 0; i < expectedLength; i++) {
+ byteValues[i] = ((Number) list.get(i)).byteValue();
+ }
+ return BinaryVector.fromPrimitiveArray(byteValues);
+ case SMALLINT:
+ short[] shortValues = new short[expectedLength];
+ for (int i = 0; i < expectedLength; i++) {
+ shortValues[i] = ((Number) list.get(i)).shortValue();
+ }
+ return BinaryVector.fromPrimitiveArray(shortValues);
+ case INTEGER:
+ int[] intValues = new int[expectedLength];
+ for (int i = 0; i < expectedLength; i++) {
+ intValues[i] = ((Number) list.get(i)).intValue();
+ }
+ return BinaryVector.fromPrimitiveArray(intValues);
+ case BIGINT:
+ long[] longValues = new long[expectedLength];
+ for (int i = 0; i < expectedLength; i++) {
+ longValues[i] = ((Number) list.get(i)).longValue();
+ }
+ return BinaryVector.fromPrimitiveArray(longValues);
+ case FLOAT:
+ float[] floatValues = new float[expectedLength];
+ for (int i = 0; i < expectedLength; i++) {
+ floatValues[i] = ((Number) list.get(i)).floatValue();
+ }
+ return BinaryVector.fromPrimitiveArray(floatValues);
+ case DOUBLE:
+ double[] doubleValues = new double[expectedLength];
+ for (int i = 0; i < expectedLength; i++) {
+ doubleValues[i] = ((Number) list.get(i)).doubleValue();
+ }
+ return BinaryVector.fromPrimitiveArray(doubleValues);
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported element type for vector " +
vectorType.getElementType());
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
index dc2f8b30ac..c16f16a429 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
@@ -44,6 +44,7 @@ import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.types.VariantType;
+import org.apache.paimon.types.VectorType;
import org.apache.spark.sql.paimon.shims.SparkShimLoader;
import org.apache.spark.sql.types.DataType;
@@ -127,8 +128,12 @@ public class SparkTypeUtils {
} else if (sparkDataType instanceof
org.apache.spark.sql.types.ArrayType) {
org.apache.spark.sql.types.ArrayType s =
(org.apache.spark.sql.types.ArrayType) sparkDataType;
- ArrayType r = (ArrayType) paimonDataType;
- return r.newElementType(prunePaimonType(s.elementType(),
r.getElementType()));
+ if (paimonDataType instanceof VectorType) {
+ return paimonDataType;
+ } else {
+ ArrayType r = (ArrayType) paimonDataType;
+ return r.newElementType(prunePaimonType(s.elementType(),
r.getElementType()));
+ }
} else {
return paimonDataType;
}
@@ -242,6 +247,11 @@ public class SparkTypeUtils {
return DataTypes.createArrayType(elementType.accept(this),
elementType.isNullable());
}
+ @Override
+ public DataType visit(VectorType vectorType) {
+ return
DataTypes.createArrayType(vectorType.getElementType().accept(this), false);
+ }
+
@Override
public DataType visit(MultisetType multisetType) {
return DataTypes.createMapType(
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkMultimodalITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkMultimodalITCase.java
new file mode 100644
index 0000000000..c3c15d1b76
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkMultimodalITCase.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.TestHiveMetastore;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for Paimon Multimodality type support on Spark. */
+public class SparkMultimodalITCase {
+
+ private static TestHiveMetastore testHiveMetastore;
+ private static final int PORT = 9092;
+
+ @BeforeAll
+ public static void startMetastore() {
+ testHiveMetastore = new TestHiveMetastore();
+ testHiveMetastore.start(PORT);
+ }
+
+ @AfterAll
+ public static void closeMetastore() throws Exception {
+ testHiveMetastore.stop();
+ }
+
+ private SparkSession.Builder createSparkSessionBuilder(Path warehousePath)
{
+ return SparkSession.builder()
+ .config("spark.sql.warehouse.dir", warehousePath.toString())
+ // with hive metastore
+ .config("spark.sql.catalogImplementation", "hive")
+ .config("hive.metastore.uris", "thrift://localhost:" + PORT)
+ .config("spark.sql.catalog.spark_catalog",
SparkCatalog.class.getName())
+ .config("spark.sql.catalog.spark_catalog.metastore", "hive")
+ .config(
+ "spark.sql.catalog.spark_catalog.hive.metastore.uris",
+ "thrift://localhost:" + PORT)
+
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
+ .config("spark.sql.catalog.spark_catalog.warehouse",
warehousePath.toString())
+ .config(
+ "spark.sql.extensions",
+
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+ .master("local[2]");
+ }
+
+ @Test
+ public void testVector(@TempDir java.nio.file.Path tempDir) throws
IOException {
+ Path warehousePath = new Path("file:" + tempDir.toString());
+ SparkSession.Builder builder =
createSparkSessionBuilder(warehousePath);
+ SparkSession spark = builder.getOrCreate();
+ spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+ spark.sql("USE spark_catalog.my_db1");
+
+ /** Create table */
+ spark.sql(
+ "\n"
+ + "CREATE TABLE my_db1.vector_test (gid BIGINT, sid
STRING, embs ARRAY<FLOAT>)"
+ + " PARTITIONED BY (`date` STRING COMMENT 'date') ROW
FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n"
+ + "WITH\n"
+ + " SERDEPROPERTIES ('serialization.format' = '1')
STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat'
OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES
(\n"
+ + " 'vector.file.format'='lance',\n"
+ + " 'vector-field'='embs',\n"
+ + " 'field.embs.vector-dim'='4',\n"
+ + " 'row-tracking.enabled'='true',\n"
+ + " 'data-evolution.enabled'='true',\n"
+ + " 'global-index.enabled' = 'true'\n"
+ + ");");
+ spark.close();
+
+ spark = builder.getOrCreate();
+ spark.sql(
+ "insert overwrite table my_db1.vector_test\n"
+ + "VALUES (1, '1', array(cast(1.0 as float), cast(2.0
as float), cast(3.0 as float), cast(4.0 as float)), '20260420'),\n"
+ + "(2, '2', array(cast(2.0 as float), cast(3.0 as
float), cast(4.0 as float), cast(5.0 as float)), '20260420'),\n"
+ + "(3, '3', array(cast(3.0 as float), cast(4.0 as
float), cast(5.0 as float), cast(6.0 as float)), '20260420'),\n"
+ + "(4, '4', array(cast(4.0 as float), cast(5.0 as
float), cast(6.0 as float), cast(7.0 as float)), '20260420'),\n"
+ + "(5, '5', array(cast(5.0 as float), cast(6.0 as
float), cast(7.0 as float), cast(8.0 as float)), '20260420'),\n"
+ + "(6, '6', array(cast(6.0 as float), cast(7.0 as
float), cast(8.0 as float), cast(9.0 as float)), '20260420'),\n"
+ + "(7, '7', array(cast(7.0 as float), cast(8.0 as
float), cast(9.0 as float), cast(10.0 as float)), '20260420'),\n"
+ + "(8, '8', array(cast(8.0 as float), cast(9.0 as
float), cast(10.0 as float), cast(11.0 as float)), '20260420');");
+ spark.close();
+
+ spark = builder.getOrCreate();
+ spark.sql(
+ "\n"
+ + "CALL sys.create_global_index(\n"
+ + " `table` => 'my_db1.vector_test',\n"
+ + " `partitions` => \"date='20260420'\",\n"
+ + " `index_column` => 'embs',\n"
+ + " `index_type` => 'lumina-vector-ann',\n"
+ + " `options` => 'lumina.index.dimension=4'\n"
+ + ");");
+ spark.close();
+
+ spark = builder.getOrCreate();
+ List<Row> rows =
+ spark.sql("select gid, sid, embs from my_db1.vector_test where
date = '20260420';")
+ .collectAsList();
+ assertThat(rows).hasSize(8);
+ rows =
+ spark.sql(
+ "select gid, sid, embs from
vector_search('my_db1.vector_test', 'embs', array(1.0f, 2.0f, 3.0f, 4.0f), 5)
where date = '20260420'")
+ .collectAsList();
+ assertThat(rows).hasSize(5);
+ spark.close();
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
index fdc7558fd5..424981916f 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.spark;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;
@@ -107,4 +108,22 @@ public class SparkTypeTest {
assertThat(toPaimonType(sparkType)).isEqualTo(ALL_TYPES);
}
+
+ @Test
+ public void testVectorType() {
+ RowType rowType =
+ RowType.builder()
+ .field("nullable_vec", DataTypes.VECTOR(3,
DataTypes.FLOAT()))
+ .field("notnull_vec", DataTypes.VECTOR(3,
DataTypes.FLOAT()).notNull())
+ .build();
+ StructType sparkType = fromPaimonRowType(rowType);
+
+ assertThat(sparkType.apply("nullable_vec").nullable()).isTrue();
+ ArrayType nullableArray = (ArrayType)
sparkType.apply("nullable_vec").dataType();
+ assertThat(nullableArray.containsNull()).isFalse();
+
+ assertThat(sparkType.apply("notnull_vec").nullable()).isFalse();
+ ArrayType notNullArray = (ArrayType)
sparkType.apply("notnull_vec").dataType();
+ assertThat(notNullArray.containsNull()).isFalse();
+ }
}