YannByron commented on code in PR #2314:
URL: https://github.com/apache/fluss/pull/2314#discussion_r2681022607


##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsFlussPartitionManagement.scala:
##########
@@ -48,6 +59,69 @@ trait SupportsFlussPartitionManagement extends 
AbstractSparkTable with SupportsP
   override def listPartitionIdentifiers(
       names: Array[String],
       ident: InternalRow): Array[InternalRow] = {
-    throw new UnsupportedOperationException("Listing partition is not 
supported")
+    assert(
+      names.length == ident.numFields,
+      s"Number of partition names (${names.length}) must be equal to " +
+        s"the number of partition values (${ident.numFields})."
+    )
+    val schema = partitionSchema()
+    assert(
+      names.forall(fieldName => schema.fieldNames.contains(fieldName)),
+      s"Some partition names ${names.mkString("[", ", ", "]")} don't belong to 
" +
+        s"the partition schema '${schema.sql}'."
+    )
+
+    val flussPartitionRows = admin
+      .listPartitionInfos(tableInfo.getTablePath)
+      .get()
+      .asScala
+      .map(p => toInternalRow(p.getPartitionSpec, schema))
+
+    val indexes = names.map(schema.fieldIndex)
+    val dataTypes = names.map(schema(_).dataType)
+    val currentRow = new GenericInternalRow(new Array[Any](names.length))
+    flussPartitionRows.filter {
+      partRow =>
+        for (i <- names.indices) {
+          currentRow.values(i) = partRow.get(indexes(i), dataTypes(i))
+        }
+        currentRow == ident
+    }.toArray
+  }
+}
+
+object SupportsFlussPartitionManagement {
+  private def toInternalRow(
+      partitionSpec: PartitionSpec,
+      partitionSchema: StructType): InternalRow = {
+    val row = new SpecificInternalRow(partitionSchema)
+    for ((field, i) <- partitionSchema.fields.zipWithIndex) {
+      val partValue = partitionSpec.getSpecMap.get(field.name)
+      val value = field.dataType match {
+        case dt =>
+          // TODO Support more types when needed.
+          PhysicalDataType(field.dataType) match {
+            case PhysicalBooleanType => partValue.toBoolean
+            case PhysicalIntegerType => partValue.toInt
+            case PhysicalDoubleType => partValue.toDouble
+            case PhysicalFloatType => partValue.toFloat
+            case PhysicalLongType => partValue.toLong
+            case PhysicalShortType => partValue.toShort
+            case PhysicalStringType => UTF8String.fromString(partValue)
+          }
+      }
+      row.update(i, value)
+    }
+    row
+  }
+
+  private def toPartitionSpec(row: InternalRow, partitionSchema: StructType): 
PartitionSpec = {
+    val map = partitionSchema.fields.zipWithIndex.map {

Review Comment:
   ```
       val partitionSpec = partitionSchema.fields.zipWithIndex.map {
         case (field, idx) =>
           (field.name, row.get(idx, field.dataType).toString)
       }.toMap
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to