nsivabalan commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1025339063
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -72,93 +70,116 @@ public static HoodieMergeHelper newInstance() {
}
@Override
- public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
- HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException
{
- final boolean externalSchemaTransformation =
table.getConfig().shouldUseExternalSchemaTransformation();
- Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+ public void runMerge(HoodieTable<?, ?, ?, ?> table,
+ HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws
IOException {
+ HoodieWriteConfig writeConfig = table.getConfig();
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
- final GenericDatumWriter<GenericRecord> gWriter;
- final GenericDatumReader<GenericRecord> gReader;
- Schema readSchema;
- if (externalSchemaTransformation ||
baseFile.getBootstrapBaseFile().isPresent()) {
- readSchema =
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(),
mergeHandle.getOldFilePath()).getSchema();
- gWriter = new GenericDatumWriter<>(readSchema);
- gReader = new GenericDatumReader<>(readSchema,
mergeHandle.getWriterSchemaWithMetaFields());
- } else {
- gReader = null;
- gWriter = null;
- readSchema = mergeHandle.getWriterSchemaWithMetaFields();
- }
+ Configuration hadoopConf = new Configuration(table.getHadoopConf());
+ HoodieFileReader<GenericRecord> reader =
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
- HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
- HoodieFileReader<GenericRecord> reader =
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile,
mergeHandle.getOldFilePath());
+ Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
+ Schema readerSchema = reader.getSchema();
- Option<InternalSchema> querySchemaOpt =
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
- boolean needToReWriteRecord = false;
- Map<String, String> renameCols = new HashMap<>();
- // TODO support bootstrap
- if (querySchemaOpt.isPresent() &&
!baseFile.getBootstrapBaseFile().isPresent()) {
- // check implicitly add columns, and position reorder(spark sql may
change cols order)
- InternalSchema querySchema =
AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
- long commitInstantTime =
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
- InternalSchema writeInternalSchema =
InternalSchemaCache.searchSchemaAndCache(commitInstantTime,
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
- if (writeInternalSchema.isEmptySchema()) {
- throw new HoodieException(String.format("cannot find file schema for
current commit %s", commitInstantTime));
- }
- List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
- List<String> colNamesFromWriteSchema =
writeInternalSchema.getAllColsFullName();
- List<String> sameCols = colNamesFromWriteSchema.stream()
- .filter(f -> colNamesFromQuerySchema.contains(f)
- && writeInternalSchema.findIdByName(f) ==
querySchema.findIdByName(f)
- && writeInternalSchema.findIdByName(f) != -1
- &&
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
- readSchema = AvroInternalSchemaConverter
- .convert(new InternalSchemaMerger(writeInternalSchema, querySchema,
true, false, false).mergeSchema(), readSchema.getName());
- Schema writeSchemaFromFile =
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
- needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
- ||
SchemaCompatibility.checkReaderWriterCompatibility(readSchema,
writeSchemaFromFile).getType() ==
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
- if (needToReWriteRecord) {
- renameCols =
InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
- }
- }
+ // In case Advanced Schema Evolution is enabled we might need to rewrite
currently
+ // persisted records to adhere to an evolved schema
+ Option<Function<GenericRecord, GenericRecord>>
schemaEvolutionTransformerOpt =
+ composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig,
table.getMetaClient());
+
+ // Check whether the writer schema is simply a projection of the file's
one, ie
+ // - Its field-set is a proper subset (of the reader schema)
+ // - There's no schema evolution transformation necessary
+ boolean isPureProjection = isProjectionOf(readerSchema, writerSchema)
+ && !schemaEvolutionTransformerOpt.isPresent();
+ // Check whether we will need to rewrite target (already merged) records
into the
+ // writer's schema
+ boolean shouldRewriteInWriterSchema =
writeConfig.shouldUseExternalSchemaTransformation()
+ || !isPureProjection
+ || baseFile.getBootstrapBaseFile().isPresent();
+
+ HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
try {
- final Iterator<GenericRecord> readerIterator;
+ Iterator<GenericRecord> recordIterator;
+
+ // In case writer's schema is simply a projection of the reader's one we
can read
+ // the records in the projected schema directly
+ ClosableIterator<GenericRecord> baseFileRecordIterator =
+ reader.getRecordIterator(isPureProjection ? writerSchema :
readerSchema);
Review Comment:
can we add docs on whats reader schema referring to and whats writer schema
referring to.
wrt reading one avro file eg, usual nomenclature is, writer schema refers to
the schema w/ which the file was written, and reader schema refers to the new
schema w/ which we are trying to read the file back.
here we have 3 things in play.
we are reading an existing file using schema Y which could have been written
using schema X, and we are passing it to UpsertPartitioner w/ possible schema
evolution.
So, is schema X the writer schema and schema Y refers to reader schema?
may be we should add docs whereever we deal w/ both these schema and
maintain uniformity across the code.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -72,93 +70,116 @@ public static HoodieMergeHelper newInstance() {
}
@Override
- public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
- HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException
{
- final boolean externalSchemaTransformation =
table.getConfig().shouldUseExternalSchemaTransformation();
- Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+ public void runMerge(HoodieTable<?, ?, ?, ?> table,
+ HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws
IOException {
+ HoodieWriteConfig writeConfig = table.getConfig();
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
- final GenericDatumWriter<GenericRecord> gWriter;
- final GenericDatumReader<GenericRecord> gReader;
- Schema readSchema;
- if (externalSchemaTransformation ||
baseFile.getBootstrapBaseFile().isPresent()) {
- readSchema =
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(),
mergeHandle.getOldFilePath()).getSchema();
- gWriter = new GenericDatumWriter<>(readSchema);
- gReader = new GenericDatumReader<>(readSchema,
mergeHandle.getWriterSchemaWithMetaFields());
- } else {
- gReader = null;
- gWriter = null;
- readSchema = mergeHandle.getWriterSchemaWithMetaFields();
- }
+ Configuration hadoopConf = new Configuration(table.getHadoopConf());
+ HoodieFileReader<GenericRecord> reader =
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
- HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
- HoodieFileReader<GenericRecord> reader =
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile,
mergeHandle.getOldFilePath());
+ Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
+ Schema readerSchema = reader.getSchema();
- Option<InternalSchema> querySchemaOpt =
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
- boolean needToReWriteRecord = false;
- Map<String, String> renameCols = new HashMap<>();
- // TODO support bootstrap
- if (querySchemaOpt.isPresent() &&
!baseFile.getBootstrapBaseFile().isPresent()) {
- // check implicitly add columns, and position reorder(spark sql may
change cols order)
- InternalSchema querySchema =
AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
- long commitInstantTime =
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
- InternalSchema writeInternalSchema =
InternalSchemaCache.searchSchemaAndCache(commitInstantTime,
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
- if (writeInternalSchema.isEmptySchema()) {
- throw new HoodieException(String.format("cannot find file schema for
current commit %s", commitInstantTime));
- }
- List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
- List<String> colNamesFromWriteSchema =
writeInternalSchema.getAllColsFullName();
- List<String> sameCols = colNamesFromWriteSchema.stream()
- .filter(f -> colNamesFromQuerySchema.contains(f)
- && writeInternalSchema.findIdByName(f) ==
querySchema.findIdByName(f)
- && writeInternalSchema.findIdByName(f) != -1
- &&
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
- readSchema = AvroInternalSchemaConverter
- .convert(new InternalSchemaMerger(writeInternalSchema, querySchema,
true, false, false).mergeSchema(), readSchema.getName());
- Schema writeSchemaFromFile =
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
- needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
- ||
SchemaCompatibility.checkReaderWriterCompatibility(readSchema,
writeSchemaFromFile).getType() ==
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
- if (needToReWriteRecord) {
- renameCols =
InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
- }
- }
+ // In case Advanced Schema Evolution is enabled we might need to rewrite
currently
+ // persisted records to adhere to an evolved schema
+ Option<Function<GenericRecord, GenericRecord>>
schemaEvolutionTransformerOpt =
+ composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig,
table.getMetaClient());
+
+ // Check whether the writer schema is simply a projection of the file's
one, ie
+ // - Its field-set is a proper subset (of the reader schema)
+ // - There's no schema evolution transformation necessary
+ boolean isPureProjection = isProjectionOf(readerSchema, writerSchema)
+ && !schemaEvolutionTransformerOpt.isPresent();
+ // Check whether we will need to rewrite target (already merged) records
into the
+ // writer's schema
+ boolean shouldRewriteInWriterSchema =
writeConfig.shouldUseExternalSchemaTransformation()
+ || !isPureProjection
+ || baseFile.getBootstrapBaseFile().isPresent();
+
+ HoodieExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
try {
- final Iterator<GenericRecord> readerIterator;
+ Iterator<GenericRecord> recordIterator;
+
+ // In case writer's schema is simply a projection of the reader's one we
can read
+ // the records in the projected schema directly
+ ClosableIterator<GenericRecord> baseFileRecordIterator =
+ reader.getRecordIterator(isPureProjection ? writerSchema :
readerSchema);
if (baseFile.getBootstrapBaseFile().isPresent()) {
- readerIterator = getMergingIterator(table, mergeHandle, baseFile,
reader, readSchema, externalSchemaTransformation);
+ Path bootstrapFilePath = new
Path(baseFile.getBootstrapBaseFile().get().getPath());
+ recordIterator = getMergingIterator(table, mergeHandle,
bootstrapFilePath, baseFileRecordIterator);
+ } else if (schemaEvolutionTransformerOpt.isPresent()) {
+ recordIterator = new MappingIterator<>(baseFileRecordIterator,
+ schemaEvolutionTransformerOpt.get());
} else {
- if (needToReWriteRecord) {
- readerIterator =
HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(),
readSchema, renameCols);
- } else {
- readerIterator = reader.getRecordIterator(readSchema);
- }
+ recordIterator = baseFileRecordIterator;
}
- ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
- ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
-
- wrapper = QueueBasedExecutorFactory.create(table.getConfig(),
readerIterator, new UpdateHandler(mergeHandle), record -> {
- if (!externalSchemaTransformation) {
+ wrapper = QueueBasedExecutorFactory.create(writeConfig, recordIterator,
new UpdateHandler(mergeHandle), record -> {
+ if (shouldRewriteInWriterSchema) {
+ return rewriteRecordWithNewSchema(record, writerSchema);
+ } else {
Review Comment:
what hpnd to transformRecordBasedOnNewSchema call when external schema
transformation was enabled (as per master). I don't see it in this patch. was
it removed intentionally. can you help clarify please.
```
wrapper = new
BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(),
readerIterator,
new UpdateHandler(mergeHandle), record -> {
if (!externalSchemaTransformation) {
return record;
}
return transformRecordBasedOnNewSchema(gReader, gWriter,
encoderCache, decoderCache, (GenericRecord) record);
}, table.getPreExecuteRunnable());
```
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##########
@@ -515,7 +521,16 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key ->
hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"),
// set the default parallelism to 200 for sql
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key ->
hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key ->
hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
- SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
+ SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
+
+ // NOTE: We have to explicitly override following configs to make sure
no schema validation is performed
+ // as schema of the incoming dataset might be diverging from the
table's schema (full schemas'
+ // compatibility b/w table's schema and incoming one is not
necessary in this case since we can
+ // be cherry-picking only selected columns from the incoming
dataset to be inserted/updated in the
+ // target table, ie partially updating)
+ AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
+ RECONCILE_SCHEMA.key -> "false",
Review Comment:
does this mean that, w/ MERGE INTO, we don't support reconciling schema ?
I am w/ you on the intent. trying to gauge what all are not supported w/
MERGE INTO wrt schema evolution. do we expect users to always go through ALTER
TABLE then?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -74,90 +75,103 @@ public static HoodieMergeHelper newInstance() {
@Override
public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException
{
- final boolean externalSchemaTransformation =
table.getConfig().shouldUseExternalSchemaTransformation();
- Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+ HoodieWriteConfig writeConfig = table.getConfig();
Review Comment:
+1 def looks better now.
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java:
##########
@@ -87,4 +96,44 @@ public void close() {
public long getTotalRecords() {
return parquetUtils.getRowCount(conf, path);
}
+
+ private static Configuration tryOverrideDefaultConfigs(Configuration conf) {
+ // NOTE: Parquet uses elaborate encoding of the arrays/lists with optional
types,
+ // following structure will be representing such list in Parquet:
+ //
+ // optional group tip_history (LIST) {
+ // repeated group list {
+ // optional group element {
+ // optional double amount;
+ // optional binary currency (STRING);
+ // }
+ // }
+ // }
+ //
+ // To designate list, special logical-type annotation (`LIST`) is
used,
+ // as well additional [[GroupType]] with the name "list" is wrapping
+ // the "element" type (representing record stored inside the list
itself).
+ //
+ // By default [[AvroSchemaConverter]] would be interpreting any
{@code REPEATED}
+ // Parquet [[GroupType]] as list, skipping the checks whether
additional [[GroupType]]
+ // (named "list") is actually wrapping the "element" type therefore
incorrectly
+ // converting it into an additional record-wrapper (instead of
simply omitting it).
+ // To work this around we're
+ // - Checking whether
[[AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS]] has been
+ // explicitly set in the Hadoop Config
+ // - In case it's not, we override the default value from "true"
to "false"
+ //
+ if (conf.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS) == null) {
Review Comment:
is this something new we are adding in this patch? or was in moved from else
where.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java:
##########
@@ -18,91 +18,47 @@
package org.apache.hudi.table.action.commit;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.utils.MergingIterator;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Iterator;
/**
* Helper to read records from previous version of base file and run Merge.
*/
-public abstract class BaseMergeHelper<T extends HoodieRecordPayload, I, K, O> {
+public abstract class BaseMergeHelper {
/**
* Read records from previous version of base file and merge.
* @param table Hoodie Table
* @param upsertHandle Merge Handle
* @throws IOException in case of error
*/
- public abstract void runMerge(HoodieTable<T, I, K, O> table,
HoodieMergeHandle<T, I, K, O> upsertHandle) throws IOException;
-
- protected GenericRecord
transformRecordBasedOnNewSchema(GenericDatumReader<GenericRecord> gReader,
GenericDatumWriter<GenericRecord> gWriter,
Review Comment:
what was the purpose of this method?
my understanding is.
incase of Bootstrapping feature, we need to read records from old file using
the schema w/ which it was written and then read back using newer schema thats
of interest to us.
also,
Within composeSchemaEvolutionTransformer() as per this patch, we could
return Option.empty() when there is no InternalSchema even for bootstrap
scenarios. So, in that case, there is no rewrite of records happening. can you
help throw some light please.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]