nsivabalan commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1027173356
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -243,6 +243,18 @@ private HoodieLogBlock readBlock() throws IOException {
}
}
+ private Option<Schema> getTargetReaderSchemaForBlock() {
+ // we should use write schema to read log file,
+ // since when we have done some DDL operation, the readerSchema maybe
different from writeSchema, avro reader will throw exception.
+ // eg: origin writeSchema is: "a String, b double" then we add a new
column now the readerSchema will be: "a string, c int, b double". it's wrong to
use readerSchema to read old log file.
+ // after we read those record by writeSchema, we rewrite those record
with readerSchema in AbstractHoodieLogRecordReader
+ if (internalSchema.isEmptySchema()) {
+ return Option.ofNullable(this.readerSchema);
+ } else {
+ return Option.empty();
Review Comment:
even w/o enabling comprehensive schema evolution, one can evolve schema w/
hudi right.
So, in such a case, we will still be reading a record written in older
schema w/ a newer schema. is that ok?
bcoz, in AbstractHoodieLogRecordReader, within
composeEvolvedSchemaTransformer, we add a transformer to rewrite the record
only if comprehensive schema evolution is enabled (i.e. internalSchema is non
empty)
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java:
##########
@@ -40,8 +41,13 @@ public SparkAvroPostProcessor(TypedProperties props,
JavaSparkContext jssc) {
@Override
public Schema processSchema(Schema schema) {
- return schema != null ? AvroConversionUtils.convertStructTypeToAvroSchema(
- AvroConversionUtils.convertAvroSchemaToStructType(schema),
RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME,
- RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE) : null;
+ if (schema == null) {
+ return null;
+ }
+
+ StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(schema);
+ // NOTE: It's critical that we preserve incoming schema's qualified
record-name to make
+ // sure we maintain schema's compatibility (as defined by
[[AvroSchemaCompatibility]])
+ return AvroConversionUtils.convertStructTypeToAvroSchema(structType,
schema.getFullName());
Review Comment:
is this backwards compatible. i.e. if someone already had a pipeline w/
0.12.0 (where the schema was using
RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE and now once they start using
0.13.0, the schema's qualified record name is gonna be different.
so, just to make sure. all our schema backwards compatibility check is
resilient to this right?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -462,37 +466,33 @@ private Pair<SchemaProvider, Pair<String,
JavaRDD<HoodieRecord>>> fetchFromSourc
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
boolean reconcileSchema =
props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
if (this.userProvidedSchemaProvider != null &&
this.userProvidedSchemaProvider.getTargetSchema() != null) {
- // If the target schema is specified through Avro schema,
- // pass in the schema for the Row-to-Avro conversion
- // to avoid nullability mismatch between Avro schema and Row schema
- avroRDDOptional = transformed
- .map(t -> HoodieSparkUtils.createRdd(
- t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE,
reconcileSchema,
- Option.of(this.userProvidedSchemaProvider.getTargetSchema())
- ).toJavaRDD());
schemaProvider = this.userProvidedSchemaProvider;
} else {
- // Use Transformed Row's schema if not overridden. If target schema is
not specified
- // default to RowBasedSchemaProvider
- schemaProvider =
- transformed
- .map(r -> {
- // determine the targetSchemaProvider. use latestTableSchema
if reconcileSchema is enabled.
- SchemaProvider targetSchemaProvider = null;
- if (reconcileSchema) {
- targetSchemaProvider =
UtilHelpers.createLatestSchemaProvider(r.schema(), jssc, fs,
cfg.targetBasePath);
- } else {
- targetSchemaProvider =
UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc);
- }
- return (SchemaProvider) new DelegatingSchemaProvider(props,
jssc,
- dataAndCheckpoint.getSchemaProvider(),
targetSchemaProvider); })
- .orElse(dataAndCheckpoint.getSchemaProvider());
- avroRDDOptional = transformed
- .map(t -> HoodieSparkUtils.createRdd(
- t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE,
reconcileSchema,
- Option.ofNullable(schemaProvider.getTargetSchema())
- ).toJavaRDD());
+ Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jssc, fs, cfg.targetBasePath);
+ // Deduce proper target (writer's) schema for the transformed dataset,
reconciling its
+ // schema w/ the table's one
+ Option<Schema> targetSchemaOpt = transformed.map(df -> {
+ Schema sourceSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
+
latestTableSchemaOpt.map(Schema::getFullName).orElse(getAvroRecordQualifiedName(cfg.targetTableName)));
+ // Target (writer's) schema is determined based on the incoming
source schema
+ // and existing table's one, reconciling the two (if necessary)
based on configuration
+ return HoodieSparkSqlWriter.deduceWriterSchema(
Review Comment:
can we move deduceWriterSchema to SchemaUtils(new class) or
HoodieSparkUtils(instead of HoodieSparkSqlWriter) or some other class since its
being used across all write paths.
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala:
##########
@@ -57,10 +57,19 @@ class ExpressionPayload(record: GenericRecord,
}
/**
- * The schema of this table.
+ * Target schema used for writing records into the table
*/
private var writeSchema: Schema = _
+ /**
+ * Original record's schema
+ *
+ * NOTE: To avoid excessive overhead of serializing original record's Avro
schema along
+ * w/ _every_ record, we instead make it to be provided along with
every request
+ * requiring this record to be deserialized
+ */
+ private var recordSchema: Schema = _
Review Comment:
One difference I spot b/w other payloads (OverwriteWithLatest, etc) and
ExpressionPayload are the usages of these reader and writer schema as instance
variables. So, once we call combineAndGetUpdateValue() and the call completes,
then these schemas will add to the payload size in memory which is not the case
w/ other payloads.
So, for large datasets w/ 100s of cols, this might posses a problem.
Can we make them local variables. just that we might reinit these schemas
whenever combineAndGetUpdate() or getInsertValue() is called. if not, we
initialize only once.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -462,37 +466,33 @@ private Pair<SchemaProvider, Pair<String,
JavaRDD<HoodieRecord>>> fetchFromSourc
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
boolean reconcileSchema =
props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
if (this.userProvidedSchemaProvider != null &&
this.userProvidedSchemaProvider.getTargetSchema() != null) {
- // If the target schema is specified through Avro schema,
- // pass in the schema for the Row-to-Avro conversion
- // to avoid nullability mismatch between Avro schema and Row schema
- avroRDDOptional = transformed
- .map(t -> HoodieSparkUtils.createRdd(
- t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE,
reconcileSchema,
- Option.of(this.userProvidedSchemaProvider.getTargetSchema())
- ).toJavaRDD());
schemaProvider = this.userProvidedSchemaProvider;
} else {
- // Use Transformed Row's schema if not overridden. If target schema is
not specified
- // default to RowBasedSchemaProvider
- schemaProvider =
- transformed
- .map(r -> {
- // determine the targetSchemaProvider. use latestTableSchema
if reconcileSchema is enabled.
- SchemaProvider targetSchemaProvider = null;
- if (reconcileSchema) {
- targetSchemaProvider =
UtilHelpers.createLatestSchemaProvider(r.schema(), jssc, fs,
cfg.targetBasePath);
- } else {
- targetSchemaProvider =
UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc);
- }
- return (SchemaProvider) new DelegatingSchemaProvider(props,
jssc,
- dataAndCheckpoint.getSchemaProvider(),
targetSchemaProvider); })
- .orElse(dataAndCheckpoint.getSchemaProvider());
- avroRDDOptional = transformed
- .map(t -> HoodieSparkUtils.createRdd(
- t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE,
reconcileSchema,
- Option.ofNullable(schemaProvider.getTargetSchema())
- ).toJavaRDD());
+ Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jssc, fs, cfg.targetBasePath);
+ // Deduce proper target (writer's) schema for the transformed dataset,
reconciling its
+ // schema w/ the table's one
+ Option<Schema> targetSchemaOpt = transformed.map(df -> {
+ Schema sourceSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
+
latestTableSchemaOpt.map(Schema::getFullName).orElse(getAvroRecordQualifiedName(cfg.targetTableName)));
+ // Target (writer's) schema is determined based on the incoming
source schema
+ // and existing table's one, reconciling the two (if necessary)
based on configuration
+ return HoodieSparkSqlWriter.deduceWriterSchema(
+ sourceSchema,
+
HoodieConversionUtils.<Schema>toScalaOption(latestTableSchemaOpt),
+
HoodieConversionUtils.<InternalSchema>toScalaOption(Option.empty()),
Review Comment:
why setting null for InternalSchema? some could enable comprehensive schema
evol w/ deltastreamer as well right? in such a case, shouldn't
deduceWriteSchema honor that.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -462,37 +466,33 @@ private Pair<SchemaProvider, Pair<String,
JavaRDD<HoodieRecord>>> fetchFromSourc
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
boolean reconcileSchema =
props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
if (this.userProvidedSchemaProvider != null &&
this.userProvidedSchemaProvider.getTargetSchema() != null) {
- // If the target schema is specified through Avro schema,
- // pass in the schema for the Row-to-Avro conversion
- // to avoid nullability mismatch between Avro schema and Row schema
- avroRDDOptional = transformed
- .map(t -> HoodieSparkUtils.createRdd(
- t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE,
reconcileSchema,
- Option.of(this.userProvidedSchemaProvider.getTargetSchema())
- ).toJavaRDD());
schemaProvider = this.userProvidedSchemaProvider;
} else {
- // Use Transformed Row's schema if not overridden. If target schema is
not specified
- // default to RowBasedSchemaProvider
- schemaProvider =
- transformed
- .map(r -> {
- // determine the targetSchemaProvider. use latestTableSchema
if reconcileSchema is enabled.
- SchemaProvider targetSchemaProvider = null;
- if (reconcileSchema) {
- targetSchemaProvider =
UtilHelpers.createLatestSchemaProvider(r.schema(), jssc, fs,
cfg.targetBasePath);
- } else {
- targetSchemaProvider =
UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc);
- }
- return (SchemaProvider) new DelegatingSchemaProvider(props,
jssc,
- dataAndCheckpoint.getSchemaProvider(),
targetSchemaProvider); })
- .orElse(dataAndCheckpoint.getSchemaProvider());
- avroRDDOptional = transformed
- .map(t -> HoodieSparkUtils.createRdd(
- t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE,
reconcileSchema,
- Option.ofNullable(schemaProvider.getTargetSchema())
- ).toJavaRDD());
+ Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jssc, fs, cfg.targetBasePath);
+ // Deduce proper target (writer's) schema for the transformed dataset,
reconciling its
+ // schema w/ the table's one
+ Option<Schema> targetSchemaOpt = transformed.map(df -> {
+ Schema sourceSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
+
latestTableSchemaOpt.map(Schema::getFullName).orElse(getAvroRecordQualifiedName(cfg.targetTableName)));
+ // Target (writer's) schema is determined based on the incoming
source schema
+ // and existing table's one, reconciling the two (if necessary)
based on configuration
+ return HoodieSparkSqlWriter.deduceWriterSchema(
+ sourceSchema,
+
HoodieConversionUtils.<Schema>toScalaOption(latestTableSchemaOpt),
+
HoodieConversionUtils.<InternalSchema>toScalaOption(Option.empty()),
+ HoodieConversionUtils.fromProperties(props));
+ });
Review Comment:
something to be mindful about not breaking any of the existing pipelines.
the new deduceWriteSchema w/ canonicalize the schema for all flows. So, are
there chances that we did not canonicalize w/ 0.12.0 for an existing pipeline,
but now adding canonicalize might break it or cause any issues?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala:
##########
@@ -274,13 +297,11 @@ object ExpressionPayload {
.maximumSize(1024)
.build[String, Map[IExpressionEvaluator, IExpressionEvaluator]]()
- private val writeSchemaCache = Caffeine.newBuilder()
+ private val schemaCache = Caffeine.newBuilder()
Review Comment:
these cache (L296) and schemaCache (L300) are also adding more to memory in
this payload compared to other payloads. So, yet another difference b/w this
and others.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]