This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bb161b5d6 [core] support vector on spark (#8019)
6bb161b5d6 is described below

commit 6bb161b5d6f209e82438ebb2da2d21be08835f37
Author: Stefanietry <[email protected]>
AuthorDate: Mon Jun 1 17:49:24 2026 +0800

    [core] support vector on spark (#8019)
---
 .../java/org/apache/paimon/hive/HiveTypeUtils.java |   6 +
 .../PaimonObjectInspectorFactory.java              |   4 +
 .../paimon/spark/AbstractSparkInternalRow.java     |   9 +-
 .../org/apache/paimon/spark/DataConverter.java     |  14 +++
 .../java/org/apache/paimon/spark/SparkCatalog.java |  20 ++++
 .../paimon/spark/SparkInternalRowWrapper.java      |  24 +++-
 .../java/org/apache/paimon/spark/SparkRow.java     |  96 ++++++++++++++-
 .../org/apache/paimon/spark/SparkTypeUtils.java    |  14 ++-
 .../apache/paimon/spark/SparkMultimodalITCase.java | 133 +++++++++++++++++++++
 .../org/apache/paimon/spark/SparkTypeTest.java     |  19 +++
 10 files changed, 333 insertions(+), 6 deletions(-)

diff --git 
a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
 
b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
index e4799341d1..23c01d1144 100644
--- 
a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
+++ 
b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
@@ -45,6 +45,7 @@ import org.apache.paimon.types.TinyIntType;
 import org.apache.paimon.types.VarBinaryType;
 import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.types.VariantType;
+import org.apache.paimon.types.VectorType;
 
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
@@ -235,6 +236,11 @@ public class HiveTypeUtils {
             return TypeInfoFactory.binaryTypeInfo;
         }
 
+        @Override
+        public TypeInfo visit(VectorType vectorType) {
+            return 
TypeInfoFactory.getListTypeInfo(vectorType.getElementType().accept(this));
+        }
+
         @Override
         protected TypeInfo defaultMethod(org.apache.paimon.types.DataType 
dataType) {
             throw new UnsupportedOperationException("Unsupported type: " + 
dataType);
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
index 9e9eb0f31f..9ad6a1a99b 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
@@ -28,6 +28,7 @@ import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimeType;
 import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.types.VectorType;
 
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
@@ -81,6 +82,9 @@ public class PaimonObjectInspectorFactory {
             case ARRAY:
                 ArrayType arrayType = (ArrayType) logicalType;
                 return new 
PaimonListObjectInspector(arrayType.getElementType());
+            case VECTOR:
+                VectorType vectorType = (VectorType) logicalType;
+                return new 
PaimonListObjectInspector(vectorType.getElementType());
             case MAP:
                 MapType mapType = (MapType) logicalType;
                 return new PaimonMapObjectInspector(mapType.getKeyType(), 
mapType.getValueType());
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java
index 283077430e..5910c60744 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java
@@ -25,6 +25,7 @@ import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeChecks;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VectorType;
 import org.apache.paimon.utils.InternalRowUtils;
 
 import org.apache.spark.sql.catalyst.util.ArrayData;
@@ -172,7 +173,13 @@ public abstract class AbstractSparkInternalRow extends 
SparkInternalRow {
 
     @Override
     public ArrayData getArray(int ordinal) {
-        return fromPaimon(row.getArray(ordinal), (ArrayType) 
rowType.getTypeAt(ordinal));
+        DataType type = rowType.getTypeAt(ordinal);
+        if (type instanceof ArrayType) {
+            return fromPaimon(row.getArray(ordinal), (ArrayType) type);
+        } else if (type instanceof VectorType) {
+            return DataConverter.fromPaimon(row.getVector(ordinal), 
(VectorType) type);
+        }
+        throw new UnsupportedOperationException("Not an array type: " + type);
     }
 
     @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java
index 0b5ea89947..5c8026f461 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalVector;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.spark.data.SparkArrayData;
 import org.apache.paimon.spark.data.SparkInternalRow;
@@ -32,6 +33,7 @@ import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.MultisetType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VectorType;
 
 import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
 import org.apache.spark.sql.catalyst.util.ArrayData;
@@ -58,6 +60,8 @@ public class DataConverter {
                 return fromPaimon((org.apache.paimon.data.Decimal) o);
             case ARRAY:
                 return fromPaimon((InternalArray) o, (ArrayType) type);
+            case VECTOR:
+                return fromPaimon((InternalVector) o, (VectorType) type);
             case MAP:
             case MULTISET:
                 return fromPaimon((InternalMap) o, type);
@@ -93,6 +97,16 @@ public class DataConverter {
         return fromPaimonArrayElementType(array, arrayType.getElementType());
     }
 
+    public static ArrayData fromPaimon(InternalVector vector, VectorType 
vectorType) {
+        if (vector.size() != vectorType.getLength()) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Vector length mismatch. Expected %d but was %d.",
+                            vectorType.getLength(), vector.size()));
+        }
+        return fromPaimonArrayElementType(vector, vectorType.getElementType());
+    }
+
     private static ArrayData fromPaimonArrayElementType(InternalArray array, 
DataType elementType) {
         return SparkArrayData.create(elementType).replace(array);
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index c6d35f6d81..b299248a81 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -44,7 +44,9 @@ import org.apache.paimon.table.object.ObjectTable;
 import org.apache.paimon.types.BlobType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.ExceptionUtils;
+import org.apache.paimon.utils.Preconditions;
 
 import org.apache.spark.sql.PaimonSparkSession$;
 import org.apache.spark.sql.SparkSession;
@@ -73,6 +75,7 @@ import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.execution.datasources.DataSource;
 import org.apache.spark.sql.execution.datasources.FileFormat;
 import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2;
+import org.apache.spark.sql.types.ArrayType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -563,6 +566,7 @@ public class SparkCatalog extends SparkBaseCatalog
         List<String> blobFields = CoreOptions.blobField(properties);
         Set<String> blobDescriptorFields = new 
CoreOptions(properties).blobDescriptorField();
         List<String> blobViewFields = CoreOptions.blobViewField(properties);
+        Set<String> vectorFields = 
CoreOptions.fromMap(properties).vectorField();
         String provider = properties.get(TableCatalog.PROP_PROVIDER);
         if (!usePaimon(provider)) {
             if (isFormatTable(provider)) {
@@ -609,6 +613,22 @@ public class SparkCatalog extends SparkBaseCatalog
                         field.dataType() instanceof 
org.apache.spark.sql.types.BinaryType,
                         "The type of blob field must be binary");
                 type = new BlobType();
+            } else if (vectorFields.contains(field.name())) {
+                Preconditions.checkArgument(
+                        field.dataType() instanceof ArrayType,
+                        "The type of blob field must be array");
+                ArrayType arrayType = (ArrayType) field.dataType();
+                String dimKey = String.format("field.%s.vector-dim", 
field.name());
+                Preconditions.checkArgument(
+                        properties.containsKey(dimKey),
+                        "When setting '"
+                                + CoreOptions.VECTOR_FIELD.key()
+                                + "', you must also set 'field.%s.vector-dim',"
+                                + " where %s is the name of the vector 
field.");
+                type =
+                        DataTypes.VECTOR(
+                                Integer.parseInt(properties.get(dimKey)),
+                                toPaimonType(arrayType.elementType()));
             } else {
                 type = toPaimonType(field.dataType()).copy(field.nullable());
             }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
index 45a7c0af41..97b5771594 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
@@ -259,7 +259,20 @@ public class SparkInternalRowWrapper implements 
InternalRow, Serializable {
 
     @Override
     public InternalVector getVector(int pos) {
-        throw new UnsupportedOperationException("Not support VectorType yet.");
+        int actualPos = getActualFieldPosition(pos);
+        if (actualPos == -1 || internalRow.isNullAt(actualPos)) {
+            return null;
+        }
+        DataType dataType = tableSchema.fields()[pos].dataType();
+        return toSparkInternalVector(dataType, 
internalRow.getArray(actualPos));
+    }
+
+    private static InternalVector toSparkInternalVector(DataType dataType, 
ArrayData arrayData) {
+        if (!(dataType instanceof ArrayType)) {
+            throw new UnsupportedOperationException("Not a vector type: " + 
dataType);
+        }
+        ArrayType arrayType = (ArrayType) dataType;
+        return new SparkInternalVector(arrayData, arrayType.elementType());
     }
 
     @Override
@@ -435,7 +448,7 @@ public class SparkInternalRowWrapper implements 
InternalRow, Serializable {
 
         @Override
         public InternalVector getVector(int pos) {
-            throw new UnsupportedOperationException("Not support VectorType 
yet.");
+            return toSparkInternalVector(elementType, arrayData.getArray(pos));
         }
 
         @Override
@@ -452,6 +465,13 @@ public class SparkInternalRowWrapper implements 
InternalRow, Serializable {
         }
     }
 
+    /** adapt to spark internal vector. */
+    public static class SparkInternalVector extends SparkInternalArray 
implements InternalVector {
+        public SparkInternalVector(ArrayData arrayData, DataType elementType) {
+            super(arrayData, elementType);
+        }
+    }
+
     /** adapt to spark internal map. */
     public static class SparkInternalMap implements InternalMap {
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
index 84767db9ab..6dea5a5d29 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
@@ -20,6 +20,7 @@ package org.apache.paimon.spark;
 
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.BinaryVector;
 import org.apache.paimon.data.Blob;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalArray;
@@ -35,6 +36,7 @@ import org.apache.paimon.types.DateType;
 import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VectorType;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.UriReaderFactory;
 
@@ -48,6 +50,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -168,7 +171,10 @@ public class SparkRow implements InternalRow, Serializable 
{
 
     @Override
     public InternalVector getVector(int pos) {
-        throw new UnsupportedOperationException("Not support VectorType yet.");
+        if (row.isNullAt(pos)) {
+            return null;
+        }
+        return toPaimonVector((VectorType) type.getTypeAt(pos), row.get(pos));
     }
 
     @Override
@@ -426,4 +432,92 @@ public class SparkRow implements InternalRow, Serializable 
{
             return res;
         }
     }
+
+    private static InternalVector toPaimonVector(VectorType vectorType, Object 
vector) {
+        if (vector == null) {
+            return null;
+        }
+        if (vector instanceof boolean[]) {
+            return BinaryVector.fromPrimitiveArray((boolean[]) vector);
+        } else if (vector instanceof byte[]) {
+            return BinaryVector.fromPrimitiveArray((byte[]) vector);
+        } else if (vector instanceof short[]) {
+            return BinaryVector.fromPrimitiveArray((short[]) vector);
+        } else if (vector instanceof int[]) {
+            return BinaryVector.fromPrimitiveArray((int[]) vector);
+        } else if (vector instanceof long[]) {
+            return BinaryVector.fromPrimitiveArray((long[]) vector);
+        } else if (vector instanceof float[]) {
+            return BinaryVector.fromPrimitiveArray((float[]) vector);
+        } else if (vector instanceof double[]) {
+            return BinaryVector.fromPrimitiveArray((double[]) vector);
+        }
+        if (vector instanceof scala.collection.Seq) {
+            vector = 
JavaConverters.seqAsJavaList((scala.collection.Seq<Object>) vector);
+        } else if (vector.getClass().isArray()) {
+            vector = Arrays.asList((Object[]) vector);
+        }
+        if (!(vector instanceof List)) {
+            throw new UnsupportedOperationException(
+                    "Unsupported vector object: " + 
vector.getClass().getName());
+        }
+        return toPaimonVector(vectorType, (List<?>) vector);
+    }
+
+    private static InternalVector toPaimonVector(VectorType vectorType, 
List<?> list) {
+        int expectedLength = vectorType.getLength();
+        if (list.size() != expectedLength) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Vector length mismatch. Expected %d but was %d.",
+                            expectedLength, list.size()));
+        }
+        switch (vectorType.getElementType().getTypeRoot()) {
+            case BOOLEAN:
+                boolean[] booleanValues = new boolean[expectedLength];
+                for (int i = 0; i < expectedLength; i++) {
+                    booleanValues[i] = (Boolean) list.get(i);
+                }
+                return BinaryVector.fromPrimitiveArray(booleanValues);
+            case TINYINT:
+                byte[] byteValues = new byte[expectedLength];
+                for (int i = 0; i < expectedLength; i++) {
+                    byteValues[i] = ((Number) list.get(i)).byteValue();
+                }
+                return BinaryVector.fromPrimitiveArray(byteValues);
+            case SMALLINT:
+                short[] shortValues = new short[expectedLength];
+                for (int i = 0; i < expectedLength; i++) {
+                    shortValues[i] = ((Number) list.get(i)).shortValue();
+                }
+                return BinaryVector.fromPrimitiveArray(shortValues);
+            case INTEGER:
+                int[] intValues = new int[expectedLength];
+                for (int i = 0; i < expectedLength; i++) {
+                    intValues[i] = ((Number) list.get(i)).intValue();
+                }
+                return BinaryVector.fromPrimitiveArray(intValues);
+            case BIGINT:
+                long[] longValues = new long[expectedLength];
+                for (int i = 0; i < expectedLength; i++) {
+                    longValues[i] = ((Number) list.get(i)).longValue();
+                }
+                return BinaryVector.fromPrimitiveArray(longValues);
+            case FLOAT:
+                float[] floatValues = new float[expectedLength];
+                for (int i = 0; i < expectedLength; i++) {
+                    floatValues[i] = ((Number) list.get(i)).floatValue();
+                }
+                return BinaryVector.fromPrimitiveArray(floatValues);
+            case DOUBLE:
+                double[] doubleValues = new double[expectedLength];
+                for (int i = 0; i < expectedLength; i++) {
+                    doubleValues[i] = ((Number) list.get(i)).doubleValue();
+                }
+                return BinaryVector.fromPrimitiveArray(doubleValues);
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported element type for vector " + 
vectorType.getElementType());
+        }
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
index dc2f8b30ac..c16f16a429 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
@@ -44,6 +44,7 @@ import org.apache.paimon.types.TinyIntType;
 import org.apache.paimon.types.VarBinaryType;
 import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.types.VariantType;
+import org.apache.paimon.types.VectorType;
 
 import org.apache.spark.sql.paimon.shims.SparkShimLoader;
 import org.apache.spark.sql.types.DataType;
@@ -127,8 +128,12 @@ public class SparkTypeUtils {
         } else if (sparkDataType instanceof 
org.apache.spark.sql.types.ArrayType) {
             org.apache.spark.sql.types.ArrayType s =
                     (org.apache.spark.sql.types.ArrayType) sparkDataType;
-            ArrayType r = (ArrayType) paimonDataType;
-            return r.newElementType(prunePaimonType(s.elementType(), 
r.getElementType()));
+            if (paimonDataType instanceof VectorType) {
+                return paimonDataType;
+            } else {
+                ArrayType r = (ArrayType) paimonDataType;
+                return r.newElementType(prunePaimonType(s.elementType(), 
r.getElementType()));
+            }
         } else {
             return paimonDataType;
         }
@@ -242,6 +247,11 @@ public class SparkTypeUtils {
             return DataTypes.createArrayType(elementType.accept(this), 
elementType.isNullable());
         }
 
+        @Override
+        public DataType visit(VectorType vectorType) {
+            return 
DataTypes.createArrayType(vectorType.getElementType().accept(this), false);
+        }
+
         @Override
         public DataType visit(MultisetType multisetType) {
             return DataTypes.createMapType(
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkMultimodalITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkMultimodalITCase.java
new file mode 100644
index 0000000000..c3c15d1b76
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkMultimodalITCase.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.TestHiveMetastore;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for Paimon Multimodality type support on Spark. */
+public class SparkMultimodalITCase {
+
+    private static TestHiveMetastore testHiveMetastore;
+    private static final int PORT = 9092;
+
+    @BeforeAll
+    public static void startMetastore() {
+        testHiveMetastore = new TestHiveMetastore();
+        testHiveMetastore.start(PORT);
+    }
+
+    @AfterAll
+    public static void closeMetastore() throws Exception {
+        testHiveMetastore.stop();
+    }
+
+    private SparkSession.Builder createSparkSessionBuilder(Path warehousePath) 
{
+        return SparkSession.builder()
+                .config("spark.sql.warehouse.dir", warehousePath.toString())
+                // with hive metastore
+                .config("spark.sql.catalogImplementation", "hive")
+                .config("hive.metastore.uris", "thrift://localhost:" + PORT)
+                .config("spark.sql.catalog.spark_catalog", 
SparkCatalog.class.getName())
+                .config("spark.sql.catalog.spark_catalog.metastore", "hive")
+                .config(
+                        "spark.sql.catalog.spark_catalog.hive.metastore.uris",
+                        "thrift://localhost:" + PORT)
+                
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
+                .config("spark.sql.catalog.spark_catalog.warehouse", 
warehousePath.toString())
+                .config(
+                        "spark.sql.extensions",
+                        
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+                .master("local[2]");
+    }
+
+    @Test
+    public void testVector(@TempDir java.nio.file.Path tempDir) throws 
IOException {
+        Path warehousePath = new Path("file:" + tempDir.toString());
+        SparkSession.Builder builder = 
createSparkSessionBuilder(warehousePath);
+        SparkSession spark = builder.getOrCreate();
+        spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+        spark.sql("USE spark_catalog.my_db1");
+
+        /** Create table */
+        spark.sql(
+                "\n"
+                        + "CREATE TABLE my_db1.vector_test (gid BIGINT, sid 
STRING, embs ARRAY<FLOAT>)"
+                        + " PARTITIONED BY (`date` STRING COMMENT 'date') ROW 
FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n"
+                        + "WITH\n"
+                        + "  SERDEPROPERTIES ('serialization.format' = '1') 
STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' 
OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES 
(\n"
+                        + "    'vector.file.format'='lance',\n"
+                        + "    'vector-field'='embs',\n"
+                        + "    'field.embs.vector-dim'='4',\n"
+                        + "    'row-tracking.enabled'='true',\n"
+                        + "    'data-evolution.enabled'='true',\n"
+                        + "    'global-index.enabled' = 'true'\n"
+                        + ");");
+        spark.close();
+
+        spark = builder.getOrCreate();
+        spark.sql(
+                "insert overwrite table my_db1.vector_test\n"
+                        + "VALUES (1, '1', array(cast(1.0 as float), cast(2.0 
as float), cast(3.0 as float), cast(4.0 as float)), '20260420'),\n"
+                        + "(2, '2', array(cast(2.0 as float), cast(3.0 as 
float), cast(4.0 as float), cast(5.0 as float)), '20260420'),\n"
+                        + "(3, '3', array(cast(3.0 as float), cast(4.0 as 
float), cast(5.0 as float), cast(6.0 as float)), '20260420'),\n"
+                        + "(4, '4', array(cast(4.0 as float), cast(5.0 as 
float), cast(6.0 as float), cast(7.0 as float)), '20260420'),\n"
+                        + "(5, '5', array(cast(5.0 as float), cast(6.0 as 
float), cast(7.0 as float), cast(8.0 as float)), '20260420'),\n"
+                        + "(6, '6', array(cast(6.0 as float), cast(7.0 as 
float), cast(8.0 as float), cast(9.0 as float)), '20260420'),\n"
+                        + "(7, '7', array(cast(7.0 as float), cast(8.0 as 
float), cast(9.0 as float), cast(10.0 as float)), '20260420'),\n"
+                        + "(8, '8', array(cast(8.0 as float), cast(9.0 as 
float), cast(10.0 as float), cast(11.0 as float)), '20260420');");
+        spark.close();
+
+        spark = builder.getOrCreate();
+        spark.sql(
+                "\n"
+                        + "CALL sys.create_global_index(\n"
+                        + "    `table` => 'my_db1.vector_test',\n"
+                        + "    `partitions` => \"date='20260420'\",\n"
+                        + "    `index_column` => 'embs',\n"
+                        + "    `index_type` => 'lumina-vector-ann',\n"
+                        + "    `options` => 'lumina.index.dimension=4'\n"
+                        + ");");
+        spark.close();
+
+        spark = builder.getOrCreate();
+        List<Row> rows =
+                spark.sql("select gid, sid, embs from my_db1.vector_test where 
date = '20260420';")
+                        .collectAsList();
+        assertThat(rows).hasSize(8);
+        rows =
+                spark.sql(
+                                "select gid, sid,  embs from 
vector_search('my_db1.vector_test', 'embs', array(1.0f, 2.0f, 3.0f, 4.0f), 5)  
where date = '20260420'")
+                        .collectAsList();
+        assertThat(rows).hasSize(5);
+        spark.close();
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
index fdc7558fd5..424981916f 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.spark;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
+import org.apache.spark.sql.types.ArrayType;
 import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.Test;
 
@@ -107,4 +108,22 @@ public class SparkTypeTest {
 
         assertThat(toPaimonType(sparkType)).isEqualTo(ALL_TYPES);
     }
+
+    @Test
+    public void testVectorType() {
+        RowType rowType =
+                RowType.builder()
+                        .field("nullable_vec", DataTypes.VECTOR(3, 
DataTypes.FLOAT()))
+                        .field("notnull_vec", DataTypes.VECTOR(3, 
DataTypes.FLOAT()).notNull())
+                        .build();
+        StructType sparkType = fromPaimonRowType(rowType);
+
+        assertThat(sparkType.apply("nullable_vec").nullable()).isTrue();
+        ArrayType nullableArray = (ArrayType) 
sparkType.apply("nullable_vec").dataType();
+        assertThat(nullableArray.containsNull()).isFalse();
+
+        assertThat(sparkType.apply("notnull_vec").nullable()).isFalse();
+        ArrayType notNullArray = (ArrayType) 
sparkType.apply("notnull_vec").dataType();
+        assertThat(notNullArray.containsNull()).isFalse();
+    }
 }

Reply via email to