This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 74e7dd4786 [#1570] feat(spark-connector): Support partition management 
(#7067)
74e7dd4786 is described below

commit 74e7dd47862aecab2d1884cdcf86e7f05083722c
Author: tian bao <[email protected]>
AuthorDate: Thu May 15 20:18:34 2025 +0800

    [#1570] feat(spark-connector): Support partition management (#7067)
    
    ### What changes were proposed in this pull request?
    
    Supports operations (add,list,drop)  on the following field types:
    
    - STRING, CHAR, VARCHAR
    - INT, TINYINT, SMALLINT, BIGINT, FLOAT, DOUBLE, DECIMAL
    - DATE
    - BOOLEAN
    
    Not Supports operations yet:
    
    TIMESTAMP and BINARY(There should be very few people using it)
    
    load not supports beacause spark datasource v2 do not support it.
    
    ### Why are the changes needed?
    
    Support partition manage.
    https://github.com/apache/gravitino/issues/1570
    
    
    ### Does this PR introduce _any_ user-facing change?
    no
    ### How was this patch tested?
    1. UT for partition convert.
    2. IT test for partition sql.
    3.  Spark sql tests.
---
 .../spark/connector/hive/SparkHiveTable.java       |  75 ++++++++-
 .../utils/HiveGravitinoOperationOperator.java      | 178 +++++++++++++++++++++
 .../spark/connector/utils/SparkPartitionUtils.java | 161 +++++++++++++++++++
 .../integration/test/hive/SparkHiveCatalogIT.java  |  51 ++++++
 .../integration/test/util/SparkUtilIT.java         |   4 +
 .../connector/utils/TestSparkPartitionUtils.java   | 153 ++++++++++++++++++
 6 files changed, 621 insertions(+), 1 deletion(-)

diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/SparkHiveTable.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/SparkHiveTable.java
index 784d493a5c..fbe684aefc 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/SparkHiveTable.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/hive/SparkHiveTable.java
@@ -19,23 +19,30 @@
 
 package org.apache.gravitino.spark.connector.hive;
 
+import com.google.common.base.Preconditions;
 import java.util.Map;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.spark.connector.PropertiesConverter;
 import org.apache.gravitino.spark.connector.SparkTransformConverter;
 import org.apache.gravitino.spark.connector.SparkTypeConverter;
 import org.apache.gravitino.spark.connector.utils.GravitinoTableInfoHelper;
+import 
org.apache.gravitino.spark.connector.utils.HiveGravitinoOperationOperator;
 import org.apache.kyuubi.spark.connector.hive.HiveTable;
 import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException;
+import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
 import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 
 /** Keep consistent behavior with the SparkIcebergTable */
-public class SparkHiveTable extends HiveTable {
+public class SparkHiveTable extends HiveTable implements 
SupportsPartitionManagement {
 
   private GravitinoTableInfoHelper gravitinoTableInfoHelper;
+  private HiveGravitinoOperationOperator hiveGravitinoOperationOperator;
 
   public SparkHiveTable(
       Identifier identifier,
@@ -54,6 +61,7 @@ public class SparkHiveTable extends HiveTable {
             propertiesConverter,
             sparkTransformConverter,
             sparkTypeConverter);
+    this.hiveGravitinoOperationOperator = new 
HiveGravitinoOperationOperator(gravitinoTable);
   }
 
   @Override
@@ -76,4 +84,69 @@ public class SparkHiveTable extends HiveTable {
   public Transform[] partitioning() {
     return gravitinoTableInfoHelper.partitioning();
   }
+
+  @Override
+  public void createPartition(InternalRow ident, Map<String, String> 
properties)
+      throws PartitionAlreadyExistsException, UnsupportedOperationException {
+    hiveGravitinoOperationOperator.createPartition(ident, properties, 
partitionSchema());
+  }
+
+  @Override
+  public boolean dropPartition(InternalRow ident) {
+    return hiveGravitinoOperationOperator.dropPartition(ident, 
partitionSchema());
+  }
+
+  @Override
+  public void replacePartitionMetadata(InternalRow ident, Map<String, String> 
properties)
+      throws NoSuchPartitionException, UnsupportedOperationException {
+    throw new UnsupportedOperationException("Replace partition is not 
supported");
+  }
+
+  @Override
+  public Map<String, String> loadPartitionMetadata(InternalRow ident)
+      throws UnsupportedOperationException {
+    return hiveGravitinoOperationOperator.loadPartitionMetadata(ident, 
partitionSchema());
+  }
+
+  @Override
+  public InternalRow[] listPartitionIdentifiers(String[] names, InternalRow 
ident) {
+    return hiveGravitinoOperationOperator.listPartitionIdentifiers(names, 
ident, partitionSchema());
+  }
+
+  @Override
+  public boolean partitionExists(InternalRow ident) {
+    String[] partitionNames = partitionSchema().names();
+    Preconditions.checkArgument(
+        ident.numFields() == partitionNames.length,
+        String.format(
+            "The number of fields (%d) in the partition identifier is not 
equal to "
+                + "the partition schema length (%d). "
+                + "The identifier might not refer to one partition.",
+            ident.numFields(), partitionNames.length));
+
+    return hiveGravitinoOperationOperator.partitionExists(partitionNames, 
ident, partitionSchema());
+  }
+
+  @Override
+  public Object productElement(int n) {
+    if (n == 0) {
+      return gravitinoTableInfoHelper;
+    }
+
+    if (n == 1) {
+      return hiveGravitinoOperationOperator;
+    }
+
+    throw new IndexOutOfBoundsException("Invalid index: " + n);
+  }
+
+  @Override
+  public int productArity() {
+    return 2;
+  }
+
+  @Override
+  public boolean canEqual(Object that) {
+    return that instanceof SparkHiveTable;
+  }
 }
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/HiveGravitinoOperationOperator.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/HiveGravitinoOperationOperator.java
new file mode 100644
index 0000000000..a88cdfbf70
--- /dev/null
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/HiveGravitinoOperationOperator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.utils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.jetbrains.annotations.NotNull;
+
+public class HiveGravitinoOperationOperator {
+
+  private org.apache.gravitino.rel.Table gravitinoTable;
+  private static final String PARTITION_NAME_DELIMITER = "/";
+  private static final String PARTITION_VALUE_DELIMITER = "=";
+
+  public HiveGravitinoOperationOperator(org.apache.gravitino.rel.Table 
gravitinoTable) {
+    this.gravitinoTable = gravitinoTable;
+  }
+
+  public void createPartition(
+      InternalRow ident, Map<String, String> properties, StructType 
partitionSchema)
+      throws PartitionAlreadyExistsException {
+    List<String[]> fields = new ArrayList<>();
+    List<Literal<?>> values = new ArrayList<>();
+
+    int numFields = ident.numFields();
+    for (int i = 0; i < numFields; i++) {
+      StructField structField = partitionSchema.apply(i);
+      DataType dataType = structField.dataType();
+      fields.add(new String[] {structField.name()});
+      values.add(SparkPartitionUtils.toGravitinoLiteral(ident, i, dataType));
+    }
+
+    Partition partition =
+        Partitions.identity(
+            null, fields.toArray(new String[0][0]), values.toArray(new 
Literal[0]), properties);
+
+    try {
+      gravitinoTable.supportPartitions().addPartition(partition);
+    } catch (org.apache.gravitino.exceptions.PartitionAlreadyExistsException 
e) {
+      throw new 
org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException(
+          e.getMessage());
+    }
+  }
+
+  public boolean dropPartition(InternalRow ident, StructType partitionSchema) {
+    String partitionName = getHivePartitionName(ident, partitionSchema);
+    return gravitinoTable.supportPartitions().dropPartition(partitionName);
+  }
+
+  public InternalRow[] listPartitionIdentifiers(
+      String[] names, InternalRow ident, StructType partitionSchema) {
+    // Get all partitions
+    String[] allPartitions = 
gravitinoTable.supportPartitions().listPartitionNames();
+
+    boolean isNeedAll = names != null && names.length == 0 ? true : false;
+
+    String[] partitionNames =
+        getHivePartitionName(names, ident, 
partitionSchema).split(PARTITION_NAME_DELIMITER);
+    String[] partitionNamesWithDelimiter =
+        Arrays.stream(partitionNames)
+            .map(name -> name + PARTITION_NAME_DELIMITER)
+            .toArray(String[]::new);
+
+    return Arrays.stream(allPartitions)
+        .map(e -> e + PARTITION_NAME_DELIMITER)
+        .filter(
+            e -> {
+              if (isNeedAll) {
+                return true;
+              }
+              for (String name : partitionNamesWithDelimiter) {
+                // exactly match
+                if (!e.contains(name)) {
+                  return false;
+                }
+              }
+              return true;
+            })
+        .map(e -> e.substring(0, e.length() - 1))
+        .map(e -> toSparkPartition(e, partitionSchema))
+        .toArray(GenericInternalRow[]::new);
+  }
+
+  public Map<String, String> loadPartitionMetadata(InternalRow ident, 
StructType partitionSchema) {
+    String partitionName = getHivePartitionName(ident, partitionSchema);
+    Partition partition = 
gravitinoTable.supportPartitions().getPartition(partitionName);
+    return partition == null ? Collections.emptyMap() : partition.properties();
+  }
+
+  public boolean partitionExists(String[] names, InternalRow ident, StructType 
partitionSchema) {
+    // Get all partitions
+    if (names != null && names.length == 0) {
+      return gravitinoTable.supportPartitions().listPartitionNames().length > 
0;
+    }
+
+    String partitionName = getHivePartitionName(names, ident, partitionSchema);
+    try {
+      return gravitinoTable.supportPartitions().partitionExists(partitionName);
+    } catch (NoSuchPartitionException noSuchPartitionException) {
+      return false;
+    }
+  }
+
+  private InternalRow toSparkPartition(String partitionName, StructType 
partitionSchema) {
+    String[] splits = partitionName.split(PARTITION_NAME_DELIMITER);
+    Object[] values = new Object[splits.length];
+    for (int i = 0; i < splits.length; i++) {
+      values[i] =
+          SparkPartitionUtils.getSparkPartitionValue(
+              splits[i].split(PARTITION_VALUE_DELIMITER)[1], 
partitionSchema.apply(i).dataType());
+    }
+    return new GenericInternalRow(values);
+  }
+
+  private @NotNull String getHivePartitionName(
+      String[] names, InternalRow ident, StructType partitionSchema) {
+    StringBuilder partitionName = new StringBuilder();
+    for (int i = 0; i < names.length; i++) {
+      StructField structField = partitionSchema.apply(i);
+      DataType dataType = structField.dataType();
+      partitionName.append(
+          names[i]
+              + PARTITION_VALUE_DELIMITER
+              + SparkPartitionUtils.getPartitionValueAsString(ident, i, 
dataType));
+      if (i < names.length - 1) {
+        partitionName.append(PARTITION_NAME_DELIMITER);
+      }
+    }
+    return partitionName.toString();
+  }
+
+  private @NotNull String getHivePartitionName(InternalRow ident, StructType 
partitionSchema) {
+    StringBuilder partitionName = new StringBuilder();
+    int numFields = ident.numFields();
+    for (int i = 0; i < numFields; i++) {
+      StructField structField = partitionSchema.apply(i);
+      DataType dataType = structField.dataType();
+      partitionName.append(
+          structField.name()
+              + PARTITION_VALUE_DELIMITER
+              + SparkPartitionUtils.getPartitionValueAsString(ident, i, 
dataType));
+      if (i < numFields - 1) {
+        partitionName.append(PARTITION_NAME_DELIMITER);
+      }
+    }
+    return partitionName.toString();
+  }
+}
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/SparkPartitionUtils.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/SparkPartitionUtils.java
new file mode 100644
index 0000000000..a8e88b6b18
--- /dev/null
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/SparkPartitionUtils.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.utils;
+
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.CharType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.VarcharType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class SparkPartitionUtils {
+
+  private SparkPartitionUtils() {}
+
+  public static Literal<?> toGravitinoLiteral(InternalRow ident, int ordinal, 
DataType sparkType) {
+    if (sparkType instanceof ByteType) {
+      return Literals.byteLiteral(ident.getByte(ordinal));
+    } else if (sparkType instanceof ShortType) {
+      return Literals.shortLiteral(ident.getShort(ordinal));
+    } else if (sparkType instanceof IntegerType) {
+      return Literals.integerLiteral(ident.getInt(ordinal));
+    } else if (sparkType instanceof LongType) {
+      return Literals.longLiteral(ident.getLong(ordinal));
+    } else if (sparkType instanceof FloatType) {
+      return Literals.floatLiteral(ident.getFloat(ordinal));
+    } else if (sparkType instanceof DoubleType) {
+      return Literals.doubleLiteral(ident.getDouble(ordinal));
+    } else if (sparkType instanceof DecimalType) {
+      DecimalType decimalType = (DecimalType) sparkType;
+      org.apache.spark.sql.types.Decimal decimal =
+          ident.getDecimal(ordinal, decimalType.precision(), 
decimalType.scale());
+      return Literals.decimalLiteral(
+          
org.apache.gravitino.rel.types.Decimal.of(decimal.toJavaBigDecimal()));
+    } else if (sparkType instanceof StringType) {
+      return Literals.stringLiteral(ident.getString(ordinal));
+    } else if (sparkType instanceof VarcharType) {
+      VarcharType varcharType = (VarcharType) sparkType;
+      return Literals.varcharLiteral(varcharType.length(), 
ident.getString(ordinal));
+    } else if (sparkType instanceof CharType) {
+      CharType charType = (CharType) sparkType;
+      return Literals.of(ident.get(ordinal, sparkType), 
Types.FixedCharType.of(charType.length()));
+    } else if (sparkType instanceof BooleanType) {
+      return Literals.booleanLiteral(ident.getBoolean(ordinal));
+    } else if (sparkType instanceof DateType) {
+      LocalDate localDate = LocalDate.ofEpochDay(ident.getInt(ordinal));
+      return Literals.dateLiteral(localDate);
+    }
+    throw new UnsupportedOperationException("Not support " + 
sparkType.toString());
+  }
+
+  public static String getPartitionValueAsString(
+      InternalRow ident, int ordinal, DataType dataType) {
+    if (ident.isNullAt(ordinal)) {
+      return null;
+    }
+    if (dataType instanceof ByteType) {
+      return String.valueOf(ident.getByte(ordinal));
+    } else if (dataType instanceof ShortType) {
+      return String.valueOf(ident.getShort(ordinal));
+    } else if (dataType instanceof IntegerType) {
+      return String.valueOf(ident.getInt(ordinal));
+    } else if (dataType instanceof StringType) {
+      return ident.getUTF8String(ordinal).toString();
+    } else if (dataType instanceof VarcharType) {
+      return ident.get(ordinal, dataType).toString();
+    } else if (dataType instanceof CharType) {
+      return ident.get(ordinal, dataType).toString();
+    } else if (dataType instanceof DateType) {
+      // DateType spark use int store.
+      LocalDate localDate = LocalDate.ofEpochDay(ident.getInt(ordinal));
+      return localDate.format(DateTimeFormatter.ISO_LOCAL_DATE);
+    } else if (dataType instanceof BooleanType) {
+      return String.valueOf(ident.getBoolean(ordinal));
+    } else if (dataType instanceof LongType) {
+      return String.valueOf(ident.getLong(ordinal));
+    } else if (dataType instanceof DoubleType) {
+      return String.valueOf(ident.getDouble(ordinal));
+    } else if (dataType instanceof FloatType) {
+      return String.valueOf(ident.getFloat(ordinal));
+    } else if (dataType instanceof DecimalType) {
+      return ident
+          .getDecimal(
+              ordinal, ((DecimalType) dataType).precision(), ((DecimalType) 
dataType).scale())
+          .toString();
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("Unsupported partition column type: %s", dataType));
+    }
+  }
+
+  public static Object getSparkPartitionValue(String hivePartitionValue, 
DataType dataType) {
+    if (hivePartitionValue == null) {
+      return null;
+    }
+    try {
+      if (dataType instanceof ByteType) {
+        return Byte.valueOf(hivePartitionValue);
+      } else if (dataType instanceof ShortType) {
+        return Short.valueOf(hivePartitionValue);
+      } else if (dataType instanceof IntegerType) {
+        return Integer.parseInt(hivePartitionValue);
+      } else if (dataType instanceof LongType) {
+        return Long.parseLong(hivePartitionValue);
+      } else if (dataType instanceof StringType) {
+        return UTF8String.fromString(hivePartitionValue);
+      } else if (dataType instanceof DateType) {
+        LocalDate localDate = LocalDate.parse(hivePartitionValue);
+        // DateType spark use int store.
+        return (int) localDate.toEpochDay();
+      } else if (dataType instanceof BooleanType) {
+        return Boolean.parseBoolean(hivePartitionValue);
+      } else if (dataType instanceof DoubleType) {
+        return Double.parseDouble(hivePartitionValue);
+      } else if (dataType instanceof FloatType) {
+        return Float.parseFloat(hivePartitionValue);
+      } else if (dataType instanceof DecimalType) {
+        return Decimal.apply(hivePartitionValue);
+      } else {
+        throw new UnsupportedOperationException("Unsupported partition type: " 
+ dataType);
+      }
+    } catch (Exception e) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Failed to convert partition value '%s' to type %s", 
hivePartitionValue, dataType),
+          e);
+    }
+  }
+}
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
index 64c286da39..09d6f902ec 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
@@ -129,6 +129,57 @@ public abstract class SparkHiveCatalogIT extends 
SparkCommonIT {
     checkPartitionDirExists(tableInfo);
   }
 
+  @Test
+  void testManagePartitionTable() {
+    String tableName = "hive_partition_ops_table";
+
+    dropTableIfExists(tableName);
+    String createTableSQL = getCreateSimpleTableString(tableName);
+    createTableSQL = createTableSQL + "PARTITIONED BY (age_p1 INT, age_p2 
STRING)";
+    sql(createTableSQL);
+
+    List<Object[]> partitionInfo = getTablePartitions(tableName);
+    Assertions.assertEquals(0, partitionInfo.size());
+
+    sql("ALTER TABLE  " + tableName + " ADD PARTITION (age_p1=20, 
age_p2='twenty')");
+    sql("ALTER TABLE  " + tableName + " ADD PARTITION (age_p1=21, 
age_p2='twenty one')");
+    partitionInfo = getTablePartitions(tableName);
+    Assertions.assertEquals(2, partitionInfo.size());
+    Assertions.assertEquals("age_p1=20/age_p2=twenty", 
partitionInfo.get(0)[0]);
+    Assertions.assertEquals("age_p1=21/age_p2=twenty one", 
partitionInfo.get(1)[0]);
+
+    sql("ALTER TABLE  " + tableName + " DROP PARTITION (age_p1=20, 
age_p2='twenty')");
+    partitionInfo = getTablePartitions(tableName);
+    Assertions.assertEquals(1, partitionInfo.size());
+    Assertions.assertEquals("age_p1=21/age_p2=twenty one", 
partitionInfo.get(0)[0]);
+
+    sql(
+        "ALTER TABLE  "
+            + tableName
+            + " ADD PARTITION (age_p1=22, age_p2='twenty two') "
+            + "LOCATION 
'/user/hive/warehouse/hive_partition_ops_table/age_p1=22/age_p2=twentytwo' ");
+    partitionInfo = getTablePartitions(tableName);
+    Assertions.assertEquals(2, partitionInfo.size());
+    Assertions.assertEquals("age_p1=21/age_p2=twenty one", 
partitionInfo.get(0)[0]);
+    Assertions.assertEquals("age_p1=22/age_p2=twenty two", 
partitionInfo.get(1)[0]);
+
+    partitionInfo = sql("SHOW PARTITIONS " + tableName + " PARTITION 
(age_p1=21)");
+    Assertions.assertEquals(1, partitionInfo.size());
+    Assertions.assertEquals("age_p1=21/age_p2=twenty one", 
partitionInfo.get(0)[0]);
+
+    // test exactly match
+    partitionInfo = sql("SHOW PARTITIONS " + tableName + " PARTITION 
(age_p1=2)");
+    Assertions.assertEquals(0, partitionInfo.size());
+
+    Exception exception =
+        Assertions.assertThrows(
+            Exception.class,
+            () -> {
+              sql("ALTER TABLE  " + tableName + " ADD PARTITION (age_p1=21, 
age_p2='twenty one')");
+            });
+    Assertions.assertTrue(exception.getMessage().contains("Partition already 
exists"));
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   void testWriteHiveDynamicPartition(boolean isInsertOverWrite) {
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
index 5c188f5800..f5f480ff28 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
@@ -138,6 +138,10 @@ public abstract class SparkUtilIT extends BaseIT {
     return SparkTableInfo.create(table.table());
   }
 
+  protected List<Object[]> getTablePartitions(String tableName) {
+    return sql("SHOW PARTITIONS " + tableName);
+  }
+
   protected void dropTableIfExists(String tableName) {
     sql("DROP TABLE IF EXISTS " + tableName);
   }
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/utils/TestSparkPartitionUtils.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/utils/TestSparkPartitionUtils.java
new file mode 100644
index 0000000000..1b9737b844
--- /dev/null
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/utils/TestSparkPartitionUtils.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.spark.connector.utils;
+
+import java.time.LocalDate;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+
+@TestInstance(Lifecycle.PER_CLASS)
+public class TestSparkPartitionUtils {
+
+  boolean boolValue = true;
+  byte byteValue = 100;
+  short shortValue = 1000;
+  int intValue = 100000;
+  long longValue = 10000000000L;
+  float floatValue = 3.14f;
+  double doubleValue = 3.1415926535;
+  UTF8String stringValue = UTF8String.fromString("Hello World");
+  int date = 0;
+  private InternalRow internalRow =
+      new GenericInternalRow(
+          new Object[] {
+            boolValue,
+            byteValue,
+            shortValue,
+            intValue,
+            longValue,
+            floatValue,
+            doubleValue,
+            stringValue,
+            date
+          });
+  private Literal[] literals =
+      new Literals.LiteralImpl[] {
+        Literals.booleanLiteral(boolValue),
+        Literals.byteLiteral(byteValue),
+        Literals.shortLiteral(shortValue),
+        Literals.integerLiteral(intValue),
+        Literals.longLiteral(longValue),
+        Literals.floatLiteral(floatValue),
+        Literals.doubleLiteral(doubleValue),
+        Literals.stringLiteral(stringValue.toString()),
+        Literals.dateLiteral(LocalDate.of(1970, 1, 1)),
+      };
+  private String[] hivePartitionValues = {
+    "true",
+    "100",
+    "1000",
+    "100000",
+    "10000000000",
+    "3.14",
+    "3.1415926535",
+    "Hello World",
+    "1970-01-01"
+  };
+  StructType schema =
+      new StructType(
+          new StructField[] {
+            new StructField("boolean", DataTypes.BooleanType, false, 
Metadata.empty()),
+            new StructField("byte", DataTypes.ByteType, false, 
Metadata.empty()),
+            new StructField("short", DataTypes.ShortType, false, 
Metadata.empty()),
+            new StructField("int", DataTypes.IntegerType, false, 
Metadata.empty()),
+            new StructField("long", DataTypes.LongType, false, 
Metadata.empty()),
+            new StructField("float", DataTypes.FloatType, false, 
Metadata.empty()),
+            new StructField("double", DataTypes.DoubleType, false, 
Metadata.empty()),
+            new StructField("string", DataTypes.StringType, false, 
Metadata.empty()),
+            new StructField("date", DataTypes.DateType, false, 
Metadata.empty())
+          });
+
+  @Test
+  void testToGravitinoLiteral() {
+    int numFields = internalRow.numFields();
+    for (int i = 0; i < numFields; i++) {
+      DataType dataType = schema.apply(i).dataType();
+      Assertions.assertEquals(
+          literals[i], SparkPartitionUtils.toGravitinoLiteral(internalRow, i, 
dataType));
+    }
+
+    Assertions.assertThrowsExactly(
+        UnsupportedOperationException.class,
+        () ->
+            SparkPartitionUtils.toGravitinoLiteral(
+                new GenericInternalRow(new Object[] {"1970-01-01 00:00:00"}),
+                0,
+                DataTypes.TimestampType));
+  }
+
+  @Test
+  void testGetPartitionValueAsString() {
+    int numFields = internalRow.numFields();
+    for (int i = 0; i < numFields; i++) {
+      DataType dataType = schema.apply(i).dataType();
+      Assertions.assertEquals(
+          hivePartitionValues[i],
+          SparkPartitionUtils.getPartitionValueAsString(internalRow, i, 
dataType));
+    }
+
+    Assertions.assertThrowsExactly(
+        UnsupportedOperationException.class,
+        () ->
+            SparkPartitionUtils.getPartitionValueAsString(
+                new GenericInternalRow(new Object[] {"1970-01-01 00:00:00"}),
+                0,
+                DataTypes.TimestampType));
+  }
+
+  @Test
+  void testGetSparkPartitionValue() {
+    int numFields = internalRow.numFields();
+    for (int i = 0; i < numFields; i++) {
+      DataType dataType = schema.apply(i).dataType();
+      Assertions.assertEquals(
+          internalRow.get(i, dataType),
+          SparkPartitionUtils.getSparkPartitionValue(hivePartitionValues[i], 
dataType));
+    }
+
+    Assertions.assertThrowsExactly(
+        UnsupportedOperationException.class,
+        () ->
+            SparkPartitionUtils.getSparkPartitionValue(
+                "1970-01-01 00:00:00", DataTypes.TimestampType));
+  }
+}

Reply via email to