alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1028361699
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala:
##########
@@ -57,10 +57,19 @@ class ExpressionPayload(record: GenericRecord,
}
/**
- * The schema of this table.
+ * Target schema used for writing records into the table
*/
private var writeSchema: Schema = _
+ /**
+ * Original record's schema
+ *
+ * NOTE: To avoid excessive overhead of serializing original record's Avro
schema along
+ * w/ _every_ record, we instead make it to be provided along with
every request
+ * requiring this record to be deserialized
+ */
+ private var recordSchema: Schema = _
Review Comment:
Oh, yeah, i totally forgot about this part.
So the problem is not even that we keep refs to schemas w/in the record, but
that we parse the schema for every record. Created HUDI-5254 to follow-up
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala:
##########
@@ -274,13 +297,11 @@ object ExpressionPayload {
.maximumSize(1024)
.build[String, Map[IExpressionEvaluator, IExpressionEvaluator]]()
- private val writeSchemaCache = Caffeine.newBuilder()
+ private val schemaCache = Caffeine.newBuilder()
Review Comment:
These caches are static (shared)
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -243,6 +243,18 @@ private HoodieLogBlock readBlock() throws IOException {
}
}
+ private Option<Schema> getTargetReaderSchemaForBlock() {
+ // we should use write schema to read log file,
+ // since when we have done some DDL operation, the readerSchema maybe
different from writeSchema, avro reader will throw exception.
+ // eg: origin writeSchema is: "a String, b double" then we add a new
column now the readerSchema will be: "a string, c int, b double". it's wrong to
use readerSchema to read old log file.
+ // after we read those record by writeSchema, we rewrite those record
with readerSchema in AbstractHoodieLogRecordReader
+ if (internalSchema.isEmptySchema()) {
+ return Option.ofNullable(this.readerSchema);
+ } else {
+ return Option.empty();
Review Comment:
> even w/o enabling comprehensive schema evolution, one can evolve schema w/
hudi right.
So, in such a case, we will still be reading a record written in older
schema w/ a newer schema. is that ok?
That's not just ok, that might be just the only way for us to read it. Idea
is we only try to read in different schema if it's explicitly requested (like
for ex, by schema evolution), otherwise we always read by the same schema file
was written in.
> bcoz, in AbstractHoodieLogRecordReader, within
composeEvolvedSchemaTransformer, we add a transformer to rewrite the record
only if comprehensive schema evolution is enabled (i.e. internalSchema is non
empty)
Good point. Minor evolutions (w/o comprehensive support) will be evened out
when we will be writing the records back, that's the reason it's currently
working as expected. We can clean this up in a follow-up
##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -76,6 +101,19 @@ public static Schema resolveUnionSchema(Schema schema,
String fieldSchemaFullNam
return nonNullType;
}
+ /**
+ * Returns true in case provided {@link Schema} is nullable (ie accepting
null values),
+ * returns false otherwise
+ */
+ public static boolean isNullable(Schema schema) {
+ if (schema.getType() != Schema.Type.UNION) {
+ return false;
+ }
+
+ List<Schema> innerTypes = schema.getTypes();
+ return innerTypes.size() > 1 && innerTypes.stream().anyMatch(it ->
it.getType() == Schema.Type.NULL);
+ }
+
Review Comment:
Forgot to push commit updating TODOs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]