GrigorievNick opened a new issue #2899:
URL: https://github.com/apache/iceberg/issues/2899
When I try to create iceberg table with iceberg bucketing through DatFrame
v2 API, I got
```
Invalid partition transformation: iceberg_bucket2(`modification_time`)
org.apache.spark.sql.AnalysisException: Invalid partition transformation:
iceberg_bucket2(`modification_time`)
at
org.apache.spark.sql.DataFrameWriterV2.$anonfun$partitionedBy$2(DataFrameWriterV2.scala:102)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.spark.sql.DataFrameWriterV2.partitionedBy(DataFrameWriterV2.scala:88)
```
I check code and spark SQL do not expect any transformation there, except
predefined in spark.
```
@scala.annotation.varargs
override def partitionedBy(column: Column, columns: Column*):
CreateTableWriter[T] = {
def ref(name: String): NamedReference =
LogicalExpressions.parseReference(name)
val asTransforms = (column +: columns).map(_.expr).map {
case Years(attr: Attribute) =>
LogicalExpressions.years(ref(attr.name))
case Months(attr: Attribute) =>
LogicalExpressions.months(ref(attr.name))
case Days(attr: Attribute) =>
LogicalExpressions.days(ref(attr.name))
case Hours(attr: Attribute) =>
LogicalExpressions.hours(ref(attr.name))
case Bucket(Literal(numBuckets: Int, IntegerType), attr: Attribute) =>
LogicalExpressions.bucket(numBuckets, Array(ref(attr.name)))
case attr: Attribute =>
LogicalExpressions.identity(ref(attr.name))
case expr =>
throw new AnalysisException(s"Invalid partition transformation:
${expr.sql}")
}
```
Code to reproduce.
```
import org.apache.iceberg.spark.IcebergSpark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DataTypes
import org.scalatest.FunSuite
class IcebergTest extends FunSuite {
private val testPath = "/tmp/iceberg_cdc_test"
private val testTable = "hdl.test_table"
val sparkSession: SparkSession = SparkSession.builder()
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions ")
.config("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse",
s"$testPath/iceberg_catalog/")
.master("local")
.getOrCreate()
IcebergSpark.registerBucketUDF(sparkSession, "iceberg_bucket2",
DataTypes.LongType, 2)
import sparkSession.implicits._
test("Create table") {
val expectedDate = (0 until 10).map(id => (id, "data1",
System.currentTimeMillis()))
expectedDate
.toDF("id", "data", "modification_time")
.sortWithinPartitions(expr("iceberg_bucket2(modification_time)"))
.writeTo(testTable)
.using("iceberg")
.tableProperty("write.format.default", "orc")
.partitionedBy(expr("iceberg_bucket2(modification_time)"))
.createOrReplace()
sparkSession
.read
.format("iceberg")
.table(testTable)
.show()
//
}
}
```
Look like a bug, But I am not sure how to fix it.
Any idea?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]