alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1020651640


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -81,20 +79,7 @@
   public static IgnoreRecord IGNORE_RECORD = new IgnoreRecord();
 
   /**
-   * The specified schema of the table. ("specified" denotes that this is 
configured by the client,
-   * as opposed to being implicitly fetched out of the commit metadata)
-   */
-  protected final Schema tableSchema;
-  protected final Schema tableSchemaWithMetaFields;

Review Comment:
   @yihua good call -- that's actually one of the ways i've tried to slice this 
PR, but no luck: the problem is that b/c of that duplication these schemas were 
actually misused -- `tableSchema` was actually the `writerSchema` in some cases 
and vice-versa and that started to fail when i started fixing schema handling 
in the `HoodieSparkSqlWriter`, which required me to fix this one as well.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -412,15 +412,14 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "OPTIMISTIC_CONCURRENCY_CONTROL: Multiple writers can operate on 
the table and exactly one of them succeed "
           + "if a conflict (writes affect the same file group) is detected.");
 
-  /**
-   * Currently the  use this to specify the write schema.
-   */
-  public static final ConfigProperty<String> WRITE_SCHEMA = ConfigProperty
+  public static final ConfigProperty<String> WRITE_SCHEMA_OVERRIDE = 
ConfigProperty
       .key("hoodie.write.schema")
       .noDefaultValue()
-      .withDocumentation("The specified write schema. In most case, we do not 
need set this parameter,"
-          + " but for the case the write schema is not equal to the specified 
table schema, we can"
-          + " specify the write schema by this parameter. Used by 
MergeIntoHoodieTableCommand");
+      .withDocumentation("Config allowing to override writing schema. This 
might be necessary in "

Review Comment:
   Good catch! Cleaned up wording



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java:
##########
@@ -378,7 +377,7 @@ protected Iterator<List<WriteStatus>> 
handleUpdateInternal(HoodieMergeHandle<?,
       throw new HoodieUpsertException(
           "Error in finding the old file path at commit " + instantTime + " 
for fileId: " + fileId);
     } else {
-      FlinkMergeHelper.newInstance().runMerge(this, upsertHandle);
+      HoodieMergeHelper.newInstance().runMerge(this, upsertHandle);

Review Comment:
   You mean just removing the files? Yeah i can extract that, but these ones 
are not really gonna reduce LOCs by much (it's one-line swap and complete nuke 
of the old file) 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java:
##########
@@ -18,91 +18,47 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.utils.MergingIterator;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
 
 /**
  * Helper to read records from previous version of base file and run Merge.
  */
-public abstract class BaseMergeHelper<T extends HoodieRecordPayload, I, K, O> {
+public abstract class BaseMergeHelper {
 
   /**
    * Read records from previous version of base file and merge.
    * @param table Hoodie Table
    * @param upsertHandle Merge Handle
    * @throws IOException in case of error
    */
-  public abstract void runMerge(HoodieTable<T, I, K, O> table, 
HoodieMergeHandle<T, I, K, O> upsertHandle) throws IOException;
-
-  protected GenericRecord 
transformRecordBasedOnNewSchema(GenericDatumReader<GenericRecord> gReader, 
GenericDatumWriter<GenericRecord> gWriter,
-                                                               
ThreadLocal<BinaryEncoder> encoderCache, ThreadLocal<BinaryDecoder> 
decoderCache,
-                                                               GenericRecord 
gRec) {
-    ByteArrayOutputStream inStream = null;
-    try {
-      inStream = new ByteArrayOutputStream();
-      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(inStream, 
encoderCache.get());
-      encoderCache.set(encoder);
-      gWriter.write(gRec, encoder);
-      encoder.flush();
-
-      BinaryDecoder decoder = 
DecoderFactory.get().binaryDecoder(inStream.toByteArray(), decoderCache.get());
-      decoderCache.set(decoder);
-      GenericRecord transformedRec = gReader.read(null, decoder);
-      return transformedRec;
-    } catch (IOException e) {
-      throw new HoodieException(e);
-    } finally {
-      try {
-        inStream.close();
-      } catch (IOException ioe) {
-        throw new HoodieException(ioe.getMessage(), ioe);
-      }
-    }
-  }
+  public abstract void runMerge(HoodieTable<?, ?, ?, ?> table, 
HoodieMergeHandle<?, ?, ?, ?> upsertHandle) throws IOException;
 
   /**
    * Create Parquet record iterator that provides a stitched view of record 
read from skeleton and bootstrap file.
    * Skeleton file is a representation of the bootstrap file inside the table, 
with just the bare bone fields needed
    * for indexing, writing and other functionality.
    *
    */
-  protected Iterator<GenericRecord> getMergingIterator(HoodieTable<T, I, K, O> 
table, HoodieMergeHandle<T, I, K, O> mergeHandle,
-                                                                               
                HoodieBaseFile baseFile, HoodieFileReader<GenericRecord> reader,
-                                                                               
                Schema readSchema, boolean externalSchemaTransformation) throws 
IOException {
-    Path externalFilePath = new 
Path(baseFile.getBootstrapBaseFile().get().getPath());
+  protected Iterator<GenericRecord> getMergingIterator(HoodieTable<?, ?, ?, ?> 
table,
+                                                       HoodieMergeHandle<?, ?, 
?, ?> mergeHandle,
+                                                       Path bootstrapFilePath,

Review Comment:
   This is basically an old config 
`AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE`. My guess is it was previously 
leveraged to force the user to specify if they want to rewrite records in the 
new schema (otherwise write would fail if schema would change)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -72,92 +69,113 @@ public static HoodieMergeHelper newInstance() {
   }
 
   @Override
-  public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
-                       HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException 
{
-    final boolean externalSchemaTransformation = 
table.getConfig().shouldUseExternalSchemaTransformation();
-    Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+  public void runMerge(HoodieTable<?, ?, ?, ?> table,
+                       HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws 
IOException {
+    HoodieWriteConfig writeConfig = table.getConfig();
     HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
 
-    final GenericDatumWriter<GenericRecord> gWriter;
-    final GenericDatumReader<GenericRecord> gReader;
-    Schema readSchema;
-    if (externalSchemaTransformation || 
baseFile.getBootstrapBaseFile().isPresent()) {
-      readSchema = 
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), 
mergeHandle.getOldFilePath()).getSchema();
-      gWriter = new GenericDatumWriter<>(readSchema);
-      gReader = new GenericDatumReader<>(readSchema, 
mergeHandle.getWriterSchemaWithMetaFields());
-    } else {
-      gReader = null;
-      gWriter = null;
-      readSchema = mergeHandle.getWriterSchemaWithMetaFields();
-    }
+    Configuration hadoopConf = new Configuration(table.getHadoopConf());
+    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
 
-    BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
-    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, 
mergeHandle.getOldFilePath());
+    Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
+    Schema readerSchema = reader.getSchema();
 
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
-    boolean needToReWriteRecord = false;
-    Map<String, String> renameCols = new HashMap<>();
-    // TODO support bootstrap
-    if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
-      // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
-      long commitInstantTime = 
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
-      InternalSchema writeInternalSchema = 
InternalSchemaCache.searchSchemaAndCache(commitInstantTime, 
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
-      if (writeInternalSchema.isEmptySchema()) {
-        throw new HoodieException(String.format("cannot find file schema for 
current commit %s", commitInstantTime));
-      }
-      List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
-      List<String> colNamesFromWriteSchema = 
writeInternalSchema.getAllColsFullName();
-      List<String> sameCols = colNamesFromWriteSchema.stream()
-              .filter(f -> colNamesFromQuerySchema.contains(f)
-                      && writeInternalSchema.findIdByName(f) == 
querySchema.findIdByName(f)
-                      && writeInternalSchema.findIdByName(f) != -1
-                      && 
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
-      readSchema = AvroInternalSchemaConverter
-          .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, 
true, false, false).mergeSchema(), readSchema.getName());
-      Schema writeSchemaFromFile = 
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
-      needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
-              || 
SchemaCompatibility.checkReaderWriterCompatibility(readSchema, 
writeSchemaFromFile).getType() == 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
-      if (needToReWriteRecord) {
-        renameCols = 
InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
-      }
-    }
+    // Check whether the writer schema is simply a projection of the file's one
+    boolean isProjection = isProjectionOf(readerSchema, writerSchema);
+    // Check whether we will need to rewrite target (already merged) records 
into the
+    // writer's schema
+    boolean shouldRewriteInWriterSchema = 
writeConfig.shouldUseExternalSchemaTransformation()
+        || !isProjection

Review Comment:
   Correct. If writer's schema is not a projection of the current file's one 
(readerSchema) we will have to rewrite the records.



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java:
##########
@@ -371,62 +383,69 @@ private static Schema 
visitInternalMapToBuildAvroMap(Types.MapType map, Schema k
    * Converts hudi PrimitiveType to Avro PrimitiveType.
    * this is auxiliary function used by visitInternalSchemaToBuildAvroSchema
    */
-  private static Schema 
visitInternalPrimitiveToBuildAvroPrimitiveType(Type.PrimitiveType primitive) {
-    Schema primitiveSchema;
+  private static Schema 
visitInternalPrimitiveToBuildAvroPrimitiveType(Type.PrimitiveType primitive, 
String recordName) {
     switch (primitive.typeId()) {
       case BOOLEAN:
-        primitiveSchema = Schema.create(Schema.Type.BOOLEAN);
-        break;
+        return Schema.create(Schema.Type.BOOLEAN);
+
       case INT:
-        primitiveSchema = Schema.create(Schema.Type.INT);
-        break;
+        return Schema.create(Schema.Type.INT);
+
       case LONG:
-        primitiveSchema = Schema.create(Schema.Type.LONG);
-        break;
+        return Schema.create(Schema.Type.LONG);
+
       case FLOAT:
-        primitiveSchema = Schema.create(Schema.Type.FLOAT);
-        break;
+        return Schema.create(Schema.Type.FLOAT);
+
       case DOUBLE:
-        primitiveSchema = Schema.create(Schema.Type.DOUBLE);
-        break;
+        return Schema.create(Schema.Type.DOUBLE);
+
       case DATE:
-        primitiveSchema = LogicalTypes.date()
-                .addToSchema(Schema.create(Schema.Type.INT));
-        break;
+        return LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+
       case TIME:
-        primitiveSchema = LogicalTypes.timeMicros()
-                .addToSchema(Schema.create(Schema.Type.LONG));
-        break;
+        return 
LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
+
       case TIMESTAMP:
-        primitiveSchema = LogicalTypes.timestampMicros()
-                .addToSchema(Schema.create(Schema.Type.LONG));
-        break;
+        return 
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
+
       case STRING:
-        primitiveSchema = Schema.create(Schema.Type.STRING);
-        break;
-      case UUID:
-        primitiveSchema = LogicalTypes.uuid()
-                .addToSchema(Schema.createFixed("uuid_fixed", null, null, 16));
-        break;
-      case FIXED:
-        Types.FixedType fixed = (Types.FixedType) primitive;
-        primitiveSchema = Schema.createFixed("fixed_" + fixed.getFixedSize(), 
null, null, fixed.getFixedSize());
-        break;
+        return Schema.create(Schema.Type.STRING);
+
       case BINARY:
-        primitiveSchema = Schema.create(Schema.Type.BYTES);
-        break;
-      case DECIMAL:
+        return Schema.create(Schema.Type.BYTES);
+
+      case UUID: {
+        // NOTE: All schemas corresponding to Avro's type [[FIXED]] are 
generated
+        //       with the "fixed" name to stay compatible w/ 
[[SchemaConverters]]
+        String name = recordName + AVRO_NAME_DELIMITER + "fixed";
+        Schema fixedSchema = Schema.createFixed(name, null, null, 16);
+        return LogicalTypes.uuid().addToSchema(fixedSchema);
+      }
+
+      case FIXED: {
+        Types.FixedType fixed = (Types.FixedType) primitive;
+        // NOTE: All schemas corresponding to Avro's type [[FIXED]] are 
generated
+        //       with the "fixed" name to stay compatible w/ 
[[SchemaConverters]]
+        String name = recordName + AVRO_NAME_DELIMITER + "fixed";
+        return Schema.createFixed(name, null, null, fixed.getFixedSize());
+      }
+
+      case DECIMAL: {
         Types.DecimalType decimal = (Types.DecimalType) primitive;
-        primitiveSchema = LogicalTypes.decimal(decimal.precision(), 
decimal.scale())
-                .addToSchema(Schema.createFixed(
-                        "decimal_" + decimal.precision() + "_" + 
decimal.scale(),
-                        null, null, 
computeMinBytesForPrecision(decimal.precision())));
-        break;
+        // NOTE: All schemas corresponding to Avro's type [[FIXED]] are 
generated
+        //       with the "fixed" name to stay compatible w/ 
[[SchemaConverters]]
+        String name = recordName + AVRO_NAME_DELIMITER + "fixed";
+        Schema fixedSchema = Schema.createFixed(name,
+            null, null, computeMinBytesForPrecision(decimal.precision()));
+        return LogicalTypes.decimal(decimal.precision(), decimal.scale())
+            .addToSchema(fixedSchema);
+      }
+      

Review Comment:
   Avro represents UUID and Decimal's as fixed byte arrays



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to