parthchandra commented on code in PR #3247:
URL: https://github.com/apache/datafusion-comet/pull/3247#discussion_r2719036610
##########
spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala:
##########
@@ -71,14 +71,138 @@ object CometIcebergNativeScan extends
CometOperatorSerde[CometBatchScanExec] wit
}
/**
- * Converts an Iceberg partition value to JSON format expected by
iceberg-rust.
- *
- * iceberg-rust's Literal::try_from_json() expects specific formats for
certain types:
- * - Timestamps: ISO string format "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"
- * - Dates: ISO string format "YYYY-MM-DD"
- * - Decimals: String representation
- *
- * See: iceberg-rust/crates/iceberg/src/spec/values/literal.rs
+ * Converts an Iceberg partition value to protobuf format. Protobuf is less
verbose than JSON.
+ * The following types are also serialized as integer values instead of as
strings - Timestamps,
+ * Dates, Decimals, FieldIDs
+ */
+ private def partitionValueToProto(
+ fieldId: Int,
+ fieldTypeStr: String,
+ value: Any): OperatorOuterClass.PartitionValue = {
+ val builder = OperatorOuterClass.PartitionValue.newBuilder()
+ builder.setFieldId(fieldId)
+
+ if (value == null) {
+ builder.setIsNull(true)
+ } else {
+ builder.setIsNull(false)
+ fieldTypeStr match {
+ case t if t.startsWith("timestamp") =>
+ val micros = value match {
+ case l: java.lang.Long => l.longValue()
+ case i: java.lang.Integer => i.longValue()
+ case _ => value.toString.toLong
+ }
+ if (t.contains("tz")) {
+ builder.setTimestampTzVal(micros)
+ } else {
+ builder.setTimestampVal(micros)
+ }
+
+ case "date" =>
+ val days = value.asInstanceOf[java.lang.Integer].intValue()
+ builder.setDateVal(days)
+
+ case d if d.startsWith("decimal(") =>
+ // Serialize as unscaled BigInteger bytes
+ val bigDecimal = value match {
+ case bd: java.math.BigDecimal => bd
+ case _ => new java.math.BigDecimal(value.toString)
+ }
+ val unscaledBytes = bigDecimal.unscaledValue().toByteArray
+
builder.setDecimalVal(com.google.protobuf.ByteString.copyFrom(unscaledBytes))
+
+ case "string" =>
+ builder.setStringVal(value.toString)
+
+ case "int" =>
+ val intVal = value match {
+ case i: java.lang.Integer => i.intValue()
+ case l: java.lang.Long => l.intValue()
+ case _ => value.toString.toInt
+ }
+ builder.setIntVal(intVal)
+
+ case "long" =>
+ val longVal = value match {
+ case l: java.lang.Long => l.longValue()
+ case i: java.lang.Integer => i.longValue()
+ case _ => value.toString.toLong
+ }
+ builder.setLongVal(longVal)
+
+ case "float" =>
+ val floatVal = value match {
+ case f: java.lang.Float => f.floatValue()
+ case d: java.lang.Double => d.floatValue()
+ case _ => value.toString.toFloat
+ }
+ builder.setFloatVal(floatVal)
+
+ case "double" =>
+ val doubleVal = value match {
+ case d: java.lang.Double => d.doubleValue()
+ case f: java.lang.Float => f.doubleValue()
+ case _ => value.toString.toDouble
+ }
+ builder.setDoubleVal(doubleVal)
+
+ case "boolean" =>
+ val boolVal = value match {
+ case b: java.lang.Boolean => b.booleanValue()
+ case _ => value.toString.toBoolean
+ }
+ builder.setBoolVal(boolVal)
+
+ case "uuid" =>
+ // UUID as bytes (16 bytes) or string
+ val uuidBytes = value match {
+ case uuid: java.util.UUID =>
+ val bb = java.nio.ByteBuffer.wrap(new Array[Byte](16))
+ bb.putLong(uuid.getMostSignificantBits)
+ bb.putLong(uuid.getLeastSignificantBits)
+ bb.array()
+ case _ =>
+ // Parse UUID string and convert to bytes
+ val uuid = java.util.UUID.fromString(value.toString)
+ val bb = java.nio.ByteBuffer.wrap(new Array[Byte](16))
+ bb.putLong(uuid.getMostSignificantBits)
+ bb.putLong(uuid.getLeastSignificantBits)
+ bb.array()
+ }
+
builder.setUuidVal(com.google.protobuf.ByteString.copyFrom(uuidBytes))
+
+ case t if t.startsWith("fixed[") || t.startsWith("binary") =>
+ val bytes = value match {
+ case bytes: Array[Byte] => bytes
+ case _ => value.toString.getBytes("UTF-8")
+ }
+ if (t.startsWith("fixed")) {
+ builder.setFixedVal(com.google.protobuf.ByteString.copyFrom(bytes))
+ } else {
+
builder.setBinaryVal(com.google.protobuf.ByteString.copyFrom(bytes))
+ }
+
+ // Fallback: infer type from Java type ?
+ case _ =>
+ value match {
+ case s: String => builder.setStringVal(s)
+ case i: java.lang.Integer => builder.setIntVal(i.intValue())
+ case l: java.lang.Long => builder.setLongVal(l.longValue())
+ case d: java.lang.Double => builder.setDoubleVal(d.doubleValue())
+ case f: java.lang.Float => builder.setFloatVal(f.floatValue())
+ case b: java.lang.Boolean => builder.setBoolVal(b.booleanValue())
+ case other => builder.setStringVal(other.toString)
+ }
+ }
+ }
+
+ builder.build()
+ }
+
+ /**
+ * Legacy JSON serialization function - removed in favor of protobuf. Kept
as reference for
+ * conversion logic.
*/
private def partitionValueToJson(fieldTypeStr: String, value: Any): JValue =
{
Review Comment:
Found some unused code as a result of removing this. Thanks!
##########
native/core/src/execution/planner.rs:
##########
@@ -2589,6 +2650,105 @@ fn convert_spark_types_to_arrow_schema(
arrow_schema
}
+/// Converts a protobuf PartitionValue to an iceberg Literal.
+///
+/// This replaces JSON parsing with direct protobuf deserialization with a
more compact
+/// representation (e.g., timestamps as integers vs strings).
Review Comment:
done
##########
native/proto/src/proto/operator.proto:
##########
@@ -110,6 +110,32 @@ message NativeScan {
bool encryption_enabled = 14;
}
+// Partition value for Iceberg partition data
+message PartitionValue {
+ int32 field_id = 1;
+ oneof value {
+ int32 int_val = 2;
+ int64 long_val = 3;
+ int64 date_val = 4; // days since epoch
+ int64 timestamp_val = 5; // microseconds since epoch
+ int64 timestamp_tz_val = 6; // microseconds with timezone
+ string string_val = 7;
+ double double_val = 8;
+ float float_val = 9;
+ bytes decimal_val = 10; // unscaled BigInteger bytes
+ bool bool_val = 11;
+ bytes uuid_val = 12;
+ bytes fixed_val = 13;
+ bytes binary_val = 14;
Review Comment:
These are Iceberg types though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]