the-other-tim-brown commented on code in PR #14374:
URL: https://github.com/apache/hudi/pull/14374#discussion_r2590204913
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -788,11 +802,12 @@ class HoodieSparkSqlWriterInternal {
}
}
- def validateSchemaForHoodieIsDeleted(schema: Schema): Unit = {
- if (schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) != null &&
-
getNonNullTypeFromUnion(schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).schema()).getType
!= Schema.Type.BOOLEAN) {
+ def validateSchemaForHoodieIsDeleted(schema: HoodieSchema): Unit = {
+ val fieldOpt = schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD)
+ if (fieldOpt.isPresent &&
+ getNonNullTypeFromUnion(fieldOpt.get().schema().toAvroSchema).getType
!= Schema.Type.BOOLEAN) {
Review Comment:
```suggestion
fieldOpt.get().schema().getNonNullType.getType !=
HoodieSchemaType.BOOLEAN
```
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCache.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+
+/**
+ * A global cache for HoodieSchema instances to ensure that there is only one
+ * variable instance of the same schema within an entire JVM lifetime.
+ *
+ * <p>This is a global cache which works for a JVM lifecycle.
+ * A collection of schema instances are maintained.
+ *
+ * <p>NOTE: The schema which is used frequently should be cached through this
cache.
+ */
+public class HoodieSchemaCache {
+
+ // Ensure that there is only one variable instance of the same schema within
an entire JVM lifetime
+ private static final LoadingCache<HoodieSchema, HoodieSchema> SCHEMA_CACHE =
+ Caffeine.newBuilder().weakValues().maximumSize(1024).build(k -> k);
+
+ /**
+ * Get schema variable from global cache. If not found, put it into the
cache and then return it.
+ *
+ * @param schema schema to get
+ * @return if found, return the exist schema variable, otherwise return the
param itself.
+ */
+ public static HoodieSchema intern(HoodieSchema schema) {
+ return SCHEMA_CACHE.get(schema);
+ }
+}
Review Comment:
nit: make sure there is a newline at the end of file
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -365,16 +365,24 @@ class HoodieSparkSqlWriterInternal {
// NOTE: We need to make sure that upon conversion of the schemas b/w
Catalyst's [[StructType]] and
// Avro's [[Schema]] we're preserving corresponding "record-name"
and "record-namespace" that
// play crucial role in establishing compatibility b/w schemas
- val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s
=> (s.getName, s.getNamespace))
+ val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s =>
+ (s.getName, toScalaOption(s.getNamespace).orNull))
Review Comment:
This optional handling can be simplified to `s.getNamespace.orElse(null)`?
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java:
##########
@@ -277,11 +278,11 @@ void testJsonKafkaSourceWithEncodedDecimals() throws
URISyntaxException {
List<GenericRecord> recs = fetch1.getBatch().get().collect();
assertEquals(10, recs.size());
- Schema deducedSchema =
- HoodieSchemaUtils.deduceWriterSchema(schemaProvider.getSourceSchema(),
Option.empty(), Option.empty(), props);
- verifyDecimalValue(recs, deducedSchema, "decfield");
- verifyDecimalValue(recs, deducedSchema, "lowprecision");
- verifyDecimalValue(recs, deducedSchema, "highprecision");
+ HoodieSchema deducedSchema =
+
HoodieSchemaUtils.deduceWriterSchema(HoodieSchema.fromAvroSchema(schemaProvider.getSourceSchema()),
Option.empty(), Option.empty(), props);
+ verifyDecimalValue(recs, deducedSchema.getAvroSchema(), "decfield");
Review Comment:
While we're here, can we update this verification easily?
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java:
##########
@@ -138,6 +139,57 @@ public static HoodieSchema
createNullableSchema(HoodieSchema schema) {
return HoodieSchema.fromAvroSchema(nullableAvro);
}
+ /**
+ * Removes specified fields from a RECORD schema.
+ * This is equivalent to HoodieAvroUtils.removeFields() but operates on
HoodieSchema.
+ *
+ * @param schema original schema (must be RECORD type)
+ * @param fieldNamesToRemove set of field names to remove
+ * @return new HoodieSchema without the specified fields
+ * @throws IllegalArgumentException if schema is null or not a RECORD type
+ */
+ public static HoodieSchema removeFields(HoodieSchema schema, Set<String>
fieldNamesToRemove) {
+ ValidationUtils.checkArgument(schema != null, "Schema cannot be null");
+ ValidationUtils.checkArgument(schema.getType() == HoodieSchemaType.RECORD,
Review Comment:
nit: when the message has some string building or concatenation, use the
string supplier so the string is lazily computed
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -169,18 +166,18 @@ object HoodieSchemaUtils {
if (!mergeIntoWrites && !shouldValidateSchemasCompatibility &&
!allowAutoEvolutionColumnDrop) {
// Default behaviour
val reconciledSchema = if (setNullForMissingColumns) {
- AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema,
latestTableSchema, setNullForMissingColumns)
+
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema.toAvroSchema(),
latestTableSchema.toAvroSchema(), setNullForMissingColumns)
} else {
- canonicalizedSourceSchema
+ canonicalizedSourceSchema.toAvroSchema()
}
- checkValidEvolution(reconciledSchema, latestTableSchema)
- reconciledSchema
+
HoodieSchemaCompatibility.checkValidEvolution(HoodieSchema.fromAvroSchema(reconciledSchema),
latestTableSchema)
+ HoodieSchema.fromAvroSchema(reconciledSchema)
Review Comment:
```suggestion
val reconciledSchema = if (setNullForMissingColumns) {
HoodieSchema.fromAvroSchema(AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema.toAvroSchema(),
latestTableSchema.toAvroSchema(), setNullForMissingColumns))
} else {
canonicalizedSourceSchema
}
HoodieSchemaCompatibility.checkValidEvolution(reconciledSchema,
latestTableSchema)
reconciledSchema
```
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java:
##########
@@ -138,6 +139,57 @@ public static HoodieSchema
createNullableSchema(HoodieSchema schema) {
return HoodieSchema.fromAvroSchema(nullableAvro);
}
+ /**
+ * Removes specified fields from a RECORD schema.
+ * This is equivalent to HoodieAvroUtils.removeFields() but operates on
HoodieSchema.
+ *
+ * @param schema original schema (must be RECORD type)
+ * @param fieldNamesToRemove set of field names to remove
+ * @return new HoodieSchema without the specified fields
+ * @throws IllegalArgumentException if schema is null or not a RECORD type
+ */
+ public static HoodieSchema removeFields(HoodieSchema schema, Set<String>
fieldNamesToRemove) {
+ ValidationUtils.checkArgument(schema != null, "Schema cannot be null");
+ ValidationUtils.checkArgument(schema.getType() == HoodieSchemaType.RECORD,
+ "Only RECORD schemas can have fields removed, got: " +
schema.getType());
+
+ if (fieldNamesToRemove == null || fieldNamesToRemove.isEmpty()) {
+ return schema;
+ }
+
+ // Filter and copy fields (must create new instances, can't reuse Avro
fields)
+ List<HoodieSchemaField> filteredFields = schema.getFields().stream()
+ .filter(field -> !fieldNamesToRemove.contains(field.name()))
+ .map(field -> HoodieSchemaField.of(
Review Comment:
There is `createNewSchemaField(HoodieSchemaField field)` now as a shorthand
for this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]