the-other-tim-brown commented on code in PR #17833:
URL: https://github.com/apache/hudi/pull/17833#discussion_r2732488755
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala:
##########
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
+import org.apache.hudi.SparkAdapterSupport.sparkAdapter
Review Comment:
Remove changes to this file?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2063,8 +2063,12 @@ private static boolean
isColumnTypeSupportedV1(HoodieSchema schema, Option<Hoodi
private static boolean isColumnTypeSupportedV2(HoodieSchema schema) {
HoodieSchemaType type = schema.getType();
// Check for precision and scale if the schema has a logical decimal type.
+ // VARIANT (unshredded) type is excluded because it stores semi-structured
data as opaque binary blobs,
+ // making min/max statistics meaningless
+ // TODO: For shredded, we are able to store colstats, explore that
Review Comment:
can you link it inline as well?
##########
hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java:
##########
@@ -250,6 +250,10 @@ private Type convertField(String fieldName, HoodieSchema
schema, Type.Repetition
break;
case UNION:
return convertUnion(fieldName, schema, repetition, schemaPath);
+ case VARIANT:
Review Comment:
Can you update the `TestAvroSchemaConverter` to cover this branch?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala:
##########
@@ -241,7 +252,15 @@ object HoodieSparkSchemaConverters {
}
}
- case other => throw new IncompatibleSchemaException(s"Unsupported
HoodieSchemaType: $other")
+ // VARIANT type (Spark >4.x only), which will be handled via SparkAdapter
Review Comment:
For lower spark versions, do we want to just return the underlying struct?
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestVariantCrossEngineCompatibility.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.testutils.ZipTestUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.utils.FlinkMiniCluster;
+import org.apache.hudi.utils.TestTableEnvs;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Integration test for cross-engine compatibility - verifying that Flink can
read Variant tables written by Spark 4.0.
+ */
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestVariantCrossEngineCompatibility {
+
+ @TempDir
+ Path tempDir;
+
+ /**
+ * Helper method to verify that Flink can read Spark 4.0 Variant tables.
+ * Variant data is represented as ROW<value BYTES, metadata BYTES> in Flink.
+ */
+ private void verifyFlinkCanReadSparkVariantTable(String tablePath, String
tableType, String testDescription) throws Exception {
+ TableEnvironment tableEnv = TestTableEnvs.getBatchTableEnv();
+
+ // Create a Hudi table pointing to the Spark-written data
+ // In Flink, Variant is represented as ROW<value BYTES, metadata BYTES>
+ // NOTE: value is a reserved keyword
+ String createTableDdl = String.format(
+ "CREATE TABLE variant_table ("
+ + " id INT,"
+ + " name STRING,"
+ + " v ROW<`value` BYTES, metadata BYTES>,"
Review Comment:
I just want to clarify one thing, the table on disk will have Variant in the
Hudi schema for this table, right?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala:
##########
@@ -20,13 +20,14 @@
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.HoodieSparkUtils
import org.apache.parquet.hadoop.metadata.FileMetaData
import org.apache.spark.sql.HoodieSchemaUtils
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, Attribute,
Cast, CreateNamedStruct, CreateStruct, Expression, GetStructField,
LambdaFunction, Literal, MapEntries, MapFromEntries, NamedLambdaVariable,
UnsafeProjection}
import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType,
DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField,
StructType, TimestampNTZType}
-object HoodieParquetFileFormatHelper {
+trait HoodieParquetFileFormatHelperTrait {
Review Comment:
Do we need this switch from object to trait?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -281,6 +282,23 @@ private ValueWriter makeWriter(HoodieSchema schema,
DataType dataType) {
} else if (dataType == DataTypes.BinaryType) {
return (row, ordinal) -> recordConsumer.addBinary(
Binary.fromReusedByteArray(row.getBinary(ordinal)));
+ } else if
(SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) {
+ // Maps VariantType to a group containing 'metadata' and 'value' fields.
+ // This ensures Spark 4.0 compatibility and supports both Shredded and
Unshredded schemas.
+ // Note: We intentionally omit 'typed_value' for shredded variants as
this writer only accesses raw binary blobs.
+ final byte[][] variantBytes = new byte[2][]; // [0] = value, [1] =
metadata
Review Comment:
Instead of reusing these byte arrays, can we just return the pair of byte
arrays from the variant data to avoid the extra copy?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]