codope commented on code in PR #9743:
URL: https://github.com/apache/hudi/pull/9743#discussion_r1356499698


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala:
##########
@@ -202,7 +202,16 @@ object AvroConversionUtils {
       }
 
       case Schema.Type.UNION => {
-        Schema.createUnion(schema.getTypes.map(innerSchema => 
getAvroSchemaWithDefaults(innerSchema, dataType)))
+        val innerFields = schema.getTypes
+        val containsNullSchema = 
innerFields.foldLeft(false)((nullFieldEncountered, schema) => 
nullFieldEncountered | schema.getType == Schema.Type.NULL)
+        if (containsNullSchema) {

Review Comment:
   let's extract this part to a separate method. It can probably be reused in 
`getAvroSchemaWithDefaults`. Also, a unit test to cover null in union types 
would be good.



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroCastingGenericRecord.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+
+import static org.apache.avro.Schema.Type.ARRAY;
+import static org.apache.avro.Schema.Type.MAP;
+import static org.apache.avro.Schema.Type.STRING;
+import static org.apache.avro.Schema.Type.UNION;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+
+
+/**
+ * Implementation of avro generic record that uses casting for implicit schema 
evolution
+ */
+public class AvroCastingGenericRecord implements GenericRecord {
+  private final IndexedRecord record;
+  private final Schema readerSchema;
+  private final Map<Integer, UnaryOperator<Object>> fieldConverters;
+
+  private AvroCastingGenericRecord(IndexedRecord record, Schema readerSchema, 
Map<Integer, UnaryOperator<Object>> fieldConverters) {
+    this.record = record;
+    this.readerSchema = readerSchema;
+    this.fieldConverters = fieldConverters;
+  }
+
+  @Override
+  public void put(int i, Object v) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Object get(int i) {
+    if (fieldConverters.containsKey(i)) {
+      return fieldConverters.get(i).apply(record.get(i));
+    }
+    return record.get(i);
+  }
+
+  @Override
+  public Schema getSchema() {
+    return readerSchema;
+  }
+
+  @Override
+  public void put(String key, Object v) {
+    Schema.Field field = getSchema().getField(key);
+    if (field == null) {
+      throw new AvroRuntimeException("Not a valid schema field: " + key);
+    }
+    put(field.pos(), v);
+  }
+
+  @Override
+  public Object get(String key) {
+    Schema.Field field = getSchema().getField(key);
+    if (field == null) {
+      throw new AvroRuntimeException("Not a valid schema field: " + key);
+    }
+    return get(field.pos());
+  }
+
+
+  /**
+   * Avro schema evolution does not support promotion from numbers to string. 
This function returns true if
+   * it will be necessary to rewrite the record to support the evolution.
+   * NOTE: this does not determine whether the schema evolution is valid. It 
is just trying to find if the schema evolves from
+   * a number to string, as quick as possible.
+   */
+  public static Boolean 
recordNeedsRewriteForExtendedAvroSchemaEvolution(Schema writerSchema, Schema 
readerSchema) {

Review Comment:
   I understand why you need it now. Can you add unit tests for this class and 
are such type conversions covered in an end to end test?



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroCastingGenericRecord.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+
+import static org.apache.avro.Schema.Type.ARRAY;
+import static org.apache.avro.Schema.Type.MAP;
+import static org.apache.avro.Schema.Type.STRING;
+import static org.apache.avro.Schema.Type.UNION;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+
+
+/**
+ * Implementation of avro generic record that uses casting for implicit schema 
evolution
+ */
+public class AvroCastingGenericRecord implements GenericRecord {
+  private final IndexedRecord record;
+  private final Schema readerSchema;
+  private final Map<Integer, UnaryOperator<Object>> fieldConverters;
+
+  private AvroCastingGenericRecord(IndexedRecord record, Schema readerSchema, 
Map<Integer, UnaryOperator<Object>> fieldConverters) {
+    this.record = record;
+    this.readerSchema = readerSchema;
+    this.fieldConverters = fieldConverters;
+  }
+
+  @Override
+  public void put(int i, Object v) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Object get(int i) {
+    if (fieldConverters.containsKey(i)) {
+      return fieldConverters.get(i).apply(record.get(i));
+    }
+    return record.get(i);
+  }
+
+  @Override
+  public Schema getSchema() {
+    return readerSchema;
+  }
+
+  @Override
+  public void put(String key, Object v) {
+    Schema.Field field = getSchema().getField(key);
+    if (field == null) {
+      throw new AvroRuntimeException("Not a valid schema field: " + key);
+    }
+    put(field.pos(), v);
+  }
+
+  @Override
+  public Object get(String key) {
+    Schema.Field field = getSchema().getField(key);
+    if (field == null) {
+      throw new AvroRuntimeException("Not a valid schema field: " + key);
+    }
+    return get(field.pos());
+  }
+

Review Comment:
   nit: remove one extra newline.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -205,6 +224,15 @@ public IndexedRecord next() {
         IndexedRecord record = this.reader.read(null, decoder);
         this.dis.skipBytes(recordLength);
         this.readRecords++;
+        if (whichImplementation) {

Review Comment:
   What if `whichImplementation` is true but `castSchema` is not present?



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java:
##########
@@ -68,6 +68,17 @@ public static Schema convert(InternalSchema internalSchema, 
String name) {
     return buildAvroSchemaFromInternalSchema(internalSchema, name);
   }
 
+  /**
+   * Convert avro Schema to avro Schema.
+   *
+   * @param internalSchema internal schema.
+   * @param name the record name.
+   * @return an avro Schema.
+   */
+  public static Schema fixNullOrdering(Schema schema) {

Review Comment:
   unit test please.. All these methods are being used in 
HoodieSparkSqlWrite.deduceWriteSchema. Important to get it right.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java:
##########
@@ -61,7 +61,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends 
SparkClientFunctionalTes
   private static Stream<Arguments> testClustering() {
     // enableClusteringAsRow, doUpdates, populateMetaFields, 
preserveCommitMetadata
     return Stream.of(
-        Arguments.of(true, true, true),
+        Arguments.of(false, true, true),

Review Comment:
   is this for clustering change? If so, please do it in another PR.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##########
@@ -19,47 +19,80 @@
 
 package org.apache.hudi.common.table.log;
 
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodiePayloadProps;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.io.storage.HoodieFileReader;
 
 import org.apache.avro.Schema;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Properties;
 
-/**
- * Reads records from base file and merges any updates from log files and 
provides iterable over all records in the file slice.
- */
-public class HoodieFileSliceReader<T> implements Iterator<HoodieRecord<T>> {
+public class HoodieFileSliceReader<T> extends LogFileIterator<T> {
+  private Option<Iterator<HoodieRecord>> baseFileIterator;
+  private HoodieMergedLogRecordScanner scanner;
+  private Schema schema;
+  private Properties props;
 
-  private final Iterator<HoodieRecord<T>> recordsIterator;
+  private TypedProperties payloadProps = new TypedProperties();
+  private Option<Pair<String, String>> simpleKeyGenFieldsOpt;
+  Map<String, HoodieRecord> records;
+  HoodieRecordMerger merger;
 
-  public static HoodieFileSliceReader getFileSliceReader(
-      Option<HoodieFileReader> baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, Properties props, Option<Pair<String, String>> 
simpleKeyGenFieldsOpt) throws IOException {
+  public HoodieFileSliceReader(Option<HoodieFileReader> baseFileReader,
+                               HoodieMergedLogRecordScanner scanner, Schema 
schema, String preCombineField, HoodieRecordMerger merger,
+                               Properties props, Option<Pair<String, String>> 
simpleKeyGenFieldsOpt) throws IOException {
+    super(scanner);
     if (baseFileReader.isPresent()) {
-      Iterator<HoodieRecord> baseIterator = 
baseFileReader.get().getRecordIterator(schema);
-      while (baseIterator.hasNext()) {
-        
scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema,
 props,
-            simpleKeyGenFieldsOpt, scanner.isWithOperationField(), 
scanner.getPartitionNameOverride(), false, Option.empty()));
-      }
+      this.baseFileIterator = 
Option.of(baseFileReader.get().getRecordIterator(schema));
+    } else {
+      this.baseFileIterator = Option.empty();
     }
-    return new HoodieFileSliceReader(scanner.iterator());
+    this.scanner = scanner;
+    this.schema = schema;
+    this.merger = merger;
+    if (preCombineField != null) {
+      
payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, 
preCombineField);
+    }
+    this.props = props;
+    this.simpleKeyGenFieldsOpt = simpleKeyGenFieldsOpt;
+    this.records = scanner.getRecords();
   }
 
-  private HoodieFileSliceReader(Iterator<HoodieRecord<T>> recordsItr) {
-    this.recordsIterator = recordsItr;
+  private Boolean hasNextInternal() {

Review Comment:
   Please add a unit test for this though.



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroCastingGenericRecord.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+
+import static org.apache.avro.Schema.Type.ARRAY;
+import static org.apache.avro.Schema.Type.MAP;
+import static org.apache.avro.Schema.Type.STRING;
+import static org.apache.avro.Schema.Type.UNION;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+
+
+/**
+ * Implementation of avro generic record that uses casting for implicit schema 
evolution
+ */
+public class AvroCastingGenericRecord implements GenericRecord {
+  private final IndexedRecord record;
+  private final Schema readerSchema;
+  private final Map<Integer, UnaryOperator<Object>> fieldConverters;
+
+  private AvroCastingGenericRecord(IndexedRecord record, Schema readerSchema, 
Map<Integer, UnaryOperator<Object>> fieldConverters) {
+    this.record = record;
+    this.readerSchema = readerSchema;
+    this.fieldConverters = fieldConverters;
+  }
+
+  @Override
+  public void put(int i, Object v) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Object get(int i) {
+    if (fieldConverters.containsKey(i)) {
+      return fieldConverters.get(i).apply(record.get(i));
+    }
+    return record.get(i);
+  }
+
+  @Override
+  public Schema getSchema() {
+    return readerSchema;
+  }
+
+  @Override
+  public void put(String key, Object v) {
+    Schema.Field field = getSchema().getField(key);
+    if (field == null) {
+      throw new AvroRuntimeException("Not a valid schema field: " + key);
+    }
+    put(field.pos(), v);
+  }
+
+  @Override
+  public Object get(String key) {
+    Schema.Field field = getSchema().getField(key);
+    if (field == null) {
+      throw new AvroRuntimeException("Not a valid schema field: " + key);
+    }
+    return get(field.pos());
+  }
+
+
+  /**
+   * Avro schema evolution does not support promotion from numbers to string. 
This function returns true if
+   * it will be necessary to rewrite the record to support the evolution.
+   * NOTE: this does not determine whether the schema evolution is valid. It 
is just trying to find if the schema evolves from
+   * a number to string, as quick as possible.
+   */
+  public static Boolean 
recordNeedsRewriteForExtendedAvroSchemaEvolution(Schema writerSchema, Schema 
readerSchema) {

Review Comment:
   yes please..



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroCastingGenericRecord.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+
+import static org.apache.avro.Schema.Type.ARRAY;
+import static org.apache.avro.Schema.Type.MAP;
+import static org.apache.avro.Schema.Type.STRING;
+import static org.apache.avro.Schema.Type.UNION;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+
+
+/**
+ * Implementation of avro generic record that uses casting for implicit schema 
evolution
+ */

Review Comment:
   Give an example of such an evolution. Typically, Avro itself supports type 
widening.



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -116,9 +116,26 @@ public static String getAvroRecordQualifiedName(String 
tableName) {
     return "hoodie." + sanitizedTableName + "." + sanitizedTableName + 
"_record";
   }
 
+

Review Comment:
   nit: extra newline



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -116,9 +116,26 @@ public static String getAvroRecordQualifiedName(String 
tableName) {
     return "hoodie." + sanitizedTableName + "." + sanitizedTableName + 
"_record";
   }
 
+
+  /**
+   * Validate whether the {@code targetSchema} is a valid evolution of {@code 
sourceSchema}.
+   * Basically {@link #isCompatibleProjectionOf(Schema, Schema)} but type 
promotion in the
+   * opposite direction
+   */
+  public static boolean isValidEvolutionOf(Schema sourceSchema, Schema 
targetSchema) {
+    return isProjectionOfInternal(sourceSchema, targetSchema,
+        AvroSchemaUtils::isAtomicSchemasCompatibleEvolution);
+  }
+
+  private static boolean isAtomicSchemasCompatibleEvolution(Schema 
oneAtomicType, Schema anotherAtomicType) {
+    // NOTE: Checking for compatibility of atomic types, we should ignore their
+    //       corresponding fully-qualified names (as irrelevant)
+    return isSchemaCompatible(anotherAtomicType, oneAtomicType, false, true);
+  }
+
+

Review Comment:
   nit: remove extra newline



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java:
##########
@@ -372,7 +372,8 @@ private SchemaCompatibilityResult 
calculateCompatibility(final Schema reader, fi
             return (writer.getType() == Type.STRING) ? result : 
result.mergedWith(typeMismatch(reader, writer, locations));
           }
           case STRING: {
-            return (writer.getType() == Type.BYTES) ? result : 
result.mergedWith(typeMismatch(reader, writer, locations));
+            return ((writer.getType() == Type.BYTES) || (writer.getType() == 
Type.INT) || (writer.getType() == Type.LONG)

Review Comment:
   better to extract to a separate method.



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -116,9 +116,26 @@ public static String getAvroRecordQualifiedName(String 
tableName) {
     return "hoodie." + sanitizedTableName + "." + sanitizedTableName + 
"_record";
   }
 
+
+  /**
+   * Validate whether the {@code targetSchema} is a valid evolution of {@code 
sourceSchema}.
+   * Basically {@link #isCompatibleProjectionOf(Schema, Schema)} but type 
promotion in the
+   * opposite direction
+   */
+  public static boolean isValidEvolutionOf(Schema sourceSchema, Schema 
targetSchema) {

Review Comment:
   unit test for as many evolution scenarios as possible.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -153,11 +156,14 @@ protected <T> ClosableIterator<T> 
deserializeRecords(HoodieReaderContext<T> read
   }
 
   private static class RecordIterator implements 
ClosableIterator<IndexedRecord> {
+
+    private final Boolean whichImplementation = false;
     private byte[] content;
     private final SizeAwareDataInputStream dis;
     private final GenericDatumReader<IndexedRecord> reader;
     private final ThreadLocal<BinaryDecoder> decoderCache = new 
ThreadLocal<>();
-
+    private Option<Schema> castSchema = Option.empty();
+    private UnaryOperator<Object> converter;

Review Comment:
   My high-level question here is why does data block need to concerned about 
schema evolution. This should just get the evolved schema from a layer above 
isn't it?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -153,11 +156,14 @@ protected <T> ClosableIterator<T> 
deserializeRecords(HoodieReaderContext<T> read
   }
 
   private static class RecordIterator implements 
ClosableIterator<IndexedRecord> {
+
+    private final Boolean whichImplementation = false;
     private byte[] content;
     private final SizeAwareDataInputStream dis;
     private final GenericDatumReader<IndexedRecord> reader;
     private final ThreadLocal<BinaryDecoder> decoderCache = new 
ThreadLocal<>();
-
+    private Option<Schema> castSchema = Option.empty();

Review Comment:
   maybe rename to `evolvedSchema`?



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java:
##########
@@ -71,6 +71,14 @@ public class HoodieCommonConfig extends HoodieConfig {
           + " operation will fail schema compatibility check. Set this option 
to true will make the newly added "
           + " column nullable to successfully complete the write operation.");
 
+  public static final ConfigProperty<String> ADD_NULL_FOR_DELETED_COLUMNS = 
ConfigProperty

Review Comment:
   Why add a config? If schema evolution is enabled then do this by default?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -499,14 +500,16 @@ object HoodieSparkSqlWriter {
                          latestTableSchemaOpt: Option[Schema],
                          internalSchemaOpt: Option[InternalSchema],
                          opts: Map[String, String]): Schema = {
+    val addNullForDeletedColumns = 
opts.getOrDefault(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key(),

Review Comment:
   way too many flags in this method. Let's see if we can reduce them. 
Irrespective, this method needs to be well-tested.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -538,6 +538,8 @@ object DataSourceWriteOptions {
 
   val RECONCILE_SCHEMA: ConfigProperty[java.lang.Boolean] = 
HoodieCommonConfig.RECONCILE_SCHEMA
 
+  val ADD_NULL_FOR_DELETED_COLUMNS: ConfigProperty[String] = 
HoodieCommonConfig.ADD_NULL_FOR_DELETED_COLUMNS

Review Comment:
   may not need this if we agree to support it by default when schema evol is 
enabled cc @rmahindra123 



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java:
##########
@@ -165,7 +165,10 @@ private ClosableIterator<IndexedRecord> 
getIndexedRecordIteratorInternal(Schema
       AvroReadSupport.setAvroReadSchema(conf, requestedSchema.get());
       AvroReadSupport.setRequestedProjection(conf, requestedSchema.get());
     }
-    ParquetReader<IndexedRecord> reader = new 
HoodieAvroParquetReaderBuilder<IndexedRecord>(path).withConf(conf).build();
+    ParquetReader<IndexedRecord> reader = new 
HoodieAvroParquetReaderBuilder<IndexedRecord>(path).withConf(conf)
+        .set(ParquetInputFormat.STRICT_TYPE_CHECKING, "false")
+        .set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false")

Review Comment:
   Why do we need to set this? Also, why isn't this guarded by schema evol 
flag? Do we want to hardcode these configs in all cases?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -996,6 +996,48 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
       .save(basePath)
   }
 
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
+  def testClusteringSamePrecombine(recordType: HoodieRecordType): Unit = {

Review Comment:
   same for this test.. remove all clustering related changes from this PR.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolution.java:
##########
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.TestHoodieSparkUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Add test cases for out of the box schema evolution for deltastreamer:
+ * 
https://hudi.apache.org/docs/schema_evolution#out-of-the-box-schema-evolution
+ */
+@Disabled

Review Comment:
   once the PR is cleaned up, please enable it.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/CachingIterator.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import java.util.Iterator;
+
+public abstract class CachingIterator<T> implements Iterator<T> {
+
+  protected T nextRecord;
+
+  protected abstract Boolean doHasNext();

Review Comment:
   Reopening, don't see this comment resolved @jonvex 
   Feel free to add a comment and mark resolved if you guys discussed offline.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolution.java:
##########
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.TestHoodieSparkUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Add test cases for out of the box schema evolution for deltastreamer:
+ * 
https://hudi.apache.org/docs/schema_evolution#out-of-the-box-schema-evolution
+ */
+@Disabled
+public class TestHoodieDeltaStreamerSchemaEvolution extends 
HoodieDeltaStreamerTestBase {
+
+  private String tableType;
+  private String tableBasePath;
+  private Boolean shouldCluster;
+  private Boolean shouldCompact;
+  private Boolean rowWriterEnable;
+  private Boolean addFilegroups;
+  private Boolean multiLogFiles;
+  private Boolean useSchemaProvider;
+  private Boolean hasTransformer;
+  private String sourceSchemaFile;
+  private String targetSchemaFile;
+
+  @BeforeEach
+  public void resetTest() {
+    useSchemaProvider = false;
+    hasTransformer = false;
+    sourceSchemaFile = "";
+    targetSchemaFile = "";
+  }
+
+  private HoodieDeltaStreamer deltaStreamer;

Review Comment:
   Use `HoodieStreamer` instead.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##########
@@ -19,47 +19,80 @@
 
 package org.apache.hudi.common.table.log;
 
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodiePayloadProps;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.io.storage.HoodieFileReader;
 
 import org.apache.avro.Schema;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Properties;
 
-/**
- * Reads records from base file and merges any updates from log files and 
provides iterable over all records in the file slice.
- */
-public class HoodieFileSliceReader<T> implements Iterator<HoodieRecord<T>> {
+public class HoodieFileSliceReader<T> extends LogFileIterator<T> {
+  private Option<Iterator<HoodieRecord>> baseFileIterator;
+  private HoodieMergedLogRecordScanner scanner;
+  private Schema schema;
+  private Properties props;
 
-  private final Iterator<HoodieRecord<T>> recordsIterator;
+  private TypedProperties payloadProps = new TypedProperties();
+  private Option<Pair<String, String>> simpleKeyGenFieldsOpt;
+  Map<String, HoodieRecord> records;
+  HoodieRecordMerger merger;
 
-  public static HoodieFileSliceReader getFileSliceReader(
-      Option<HoodieFileReader> baseFileReader, HoodieMergedLogRecordScanner 
scanner, Schema schema, Properties props, Option<Pair<String, String>> 
simpleKeyGenFieldsOpt) throws IOException {
+  public HoodieFileSliceReader(Option<HoodieFileReader> baseFileReader,
+                               HoodieMergedLogRecordScanner scanner, Schema 
schema, String preCombineField, HoodieRecordMerger merger,
+                               Properties props, Option<Pair<String, String>> 
simpleKeyGenFieldsOpt) throws IOException {
+    super(scanner);
     if (baseFileReader.isPresent()) {
-      Iterator<HoodieRecord> baseIterator = 
baseFileReader.get().getRecordIterator(schema);
-      while (baseIterator.hasNext()) {
-        
scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema,
 props,
-            simpleKeyGenFieldsOpt, scanner.isWithOperationField(), 
scanner.getPartitionNameOverride(), false, Option.empty()));
-      }
+      this.baseFileIterator = 
Option.of(baseFileReader.get().getRecordIterator(schema));
+    } else {
+      this.baseFileIterator = Option.empty();
     }
-    return new HoodieFileSliceReader(scanner.iterator());
+    this.scanner = scanner;
+    this.schema = schema;
+    this.merger = merger;
+    if (preCombineField != null) {
+      
payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, 
preCombineField);
+    }
+    this.props = props;
+    this.simpleKeyGenFieldsOpt = simpleKeyGenFieldsOpt;
+    this.records = scanner.getRecords();
   }
 
-  private HoodieFileSliceReader(Iterator<HoodieRecord<T>> recordsItr) {
-    this.recordsIterator = recordsItr;
+  private Boolean hasNextInternal() {

Review Comment:
   I think this should be out of the PR after rebase. This is for fixing a 
clustering issue with MOR tables I believe.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -153,11 +156,14 @@ protected <T> ClosableIterator<T> 
deserializeRecords(HoodieReaderContext<T> read
   }
 
   private static class RecordIterator implements 
ClosableIterator<IndexedRecord> {
+
+    private final Boolean whichImplementation = false;

Review Comment:
   primitive plus rename to something more meaningful.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolution.java:
##########
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.TestHoodieSparkUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Add test cases for out of the box schema evolution for deltastreamer:
+ * 
https://hudi.apache.org/docs/schema_evolution#out-of-the-box-schema-evolution
+ */
+@Disabled
+public class TestHoodieDeltaStreamerSchemaEvolution extends 
HoodieDeltaStreamerTestBase {

Review Comment:
   This test has good coverage. But it works with deltastreamer. It's important 
that we test the changes in `HoodieSparkSqlWriter` too. So, distributing the 
tests here between deltastreamer and datasource would be advisable. Ideally, 
you can add a similar test suite for spark sql writer but test runtime is 
another issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to