vinothchandar commented on a change in pull request #2927:
URL: https://github.com/apache/hudi/pull/2927#discussion_r638213323



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -62,11 +65,16 @@ public TableSchemaResolver(HoodieTableMetaClient 
metaClient) {
   }
 
   /**
-   * Gets the schema for a hoodie table. Depending on the type of table, read 
from any file written in the latest
-   * commit. We will assume that the schema has not changed within a single 
atomic write.
+   * @return {@code true} if any commits are found, else {@code false}.
+   */
+  private boolean isTimelineNonEmpty() {

Review comment:
       hmmm. does it really go here? We should only add methods that are 
relevant for this class i.e schema resolution here.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -110,6 +110,20 @@ public static FileSystem getFs(String path, Configuration 
conf, boolean localByD
     return getFs(path, conf);
   }
 
+  /**
+   * Check if table already exists in the given path.
+   * @param path base path of the table.
+   * @param fs instance of {@link FileSystem}.
+   * @return {@code true} if table exists. {@code false} otherwise.
+   */
+  public static boolean isTableExists(String path, FileSystem fs) {

Review comment:
       are we sure there is n't an existing method for this? 
   Also should n't we rethrow the IOException, rather than assuming its false. 
What if there is a network error, that made `exists()` fail. but the table 
actually existed

##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -343,25 +350,67 @@ object AvroConversionHelper {
       case structType: StructType =>
         val schema: Schema = convertStructTypeToAvroSchema(structType, 
structName, recordNamespace)
         val childNameSpace = if (recordNamespace != "") 
s"$recordNamespace.$structName" else structName
-        val fieldConverters = structType.fields.map(field =>
-          createConverterToAvro(
-            field.dataType,
-            field.name,
-            childNameSpace))
-        (item: Any) => {
-          if (item == null) {
-            null
-          } else {
-            val record = new Record(schema)
-            val convertersIterator = fieldConverters.iterator
-            val fieldNamesIterator = 
dataType.asInstanceOf[StructType].fieldNames.iterator
-            val rowIterator = item.asInstanceOf[Row].toSeq.iterator
+        if (honorSchemaMismatch) {

Review comment:
       can you add some context on what exactly is the fix here?

##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -343,25 +350,67 @@ object AvroConversionHelper {
       case structType: StructType =>
         val schema: Schema = convertStructTypeToAvroSchema(structType, 
structName, recordNamespace)
         val childNameSpace = if (recordNamespace != "") 
s"$recordNamespace.$structName" else structName
-        val fieldConverters = structType.fields.map(field =>
-          createConverterToAvro(
-            field.dataType,
-            field.name,
-            childNameSpace))
-        (item: Any) => {
-          if (item == null) {
-            null
-          } else {
-            val record = new Record(schema)
-            val convertersIterator = fieldConverters.iterator
-            val fieldNamesIterator = 
dataType.asInstanceOf[StructType].fieldNames.iterator
-            val rowIterator = item.asInstanceOf[Row].toSeq.iterator
+        if (honorSchemaMismatch) {

Review comment:
       let's pull this into its own method?. lets please keep our guidelines in 
mind :)  
   https://hudi.apache.org/contributing#coding-guidelines 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -353,6 +361,39 @@ public static boolean isSchemaCompatible(String oldSchema, 
String newSchema) {
     return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new 
Schema.Parser().parse(newSchema));
   }
 
+  /**
+   * Get latest schema either from incoming schema or table schema.
+   * @param incomingSchema incoming batch's schema.
+   * @param convertTableSchemaToAddNamespace {@code true} if table schema 
needs to be converted. {@code false} otherwise.
+   * @param converterFn converter function to be called over table schema. In 
DeltaSync flow, table schema needs to convert
+   * from avro -> df -> avro to add the namespace in the schema. But in spark 
writer flow, no such conversion is required.
+   * This package does not have access to some elements needed for conversion, 
hence added it as function call rather than embedding here.
+   * @return the latest schema.
+   */
+  public Schema getLatestSchema(Schema incomingSchema, boolean 
convertTableSchemaToAddNamespace,

Review comment:
       can we standardize on writeSchema and tableSchema? instead of 
introducing new terminlogy like `incoming` ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -353,6 +361,39 @@ public static boolean isSchemaCompatible(String oldSchema, 
String newSchema) {
     return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new 
Schema.Parser().parse(newSchema));
   }
 
+  /**
+   * Get latest schema either from incoming schema or table schema.
+   * @param incomingSchema incoming batch's schema.
+   * @param convertTableSchemaToAddNamespace {@code true} if table schema 
needs to be converted. {@code false} otherwise.
+   * @param converterFn converter function to be called over table schema. In 
DeltaSync flow, table schema needs to convert

Review comment:
       please avoid any references to layers of code so far up, from code thats 
common to everything like hudi-common

##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -343,25 +350,67 @@ object AvroConversionHelper {
       case structType: StructType =>
         val schema: Schema = convertStructTypeToAvroSchema(structType, 
structName, recordNamespace)
         val childNameSpace = if (recordNamespace != "") 
s"$recordNamespace.$structName" else structName
-        val fieldConverters = structType.fields.map(field =>
-          createConverterToAvro(
-            field.dataType,
-            field.name,
-            childNameSpace))
-        (item: Any) => {
-          if (item == null) {
-            null
-          } else {
-            val record = new Record(schema)
-            val convertersIterator = fieldConverters.iterator
-            val fieldNamesIterator = 
dataType.asInstanceOf[StructType].fieldNames.iterator
-            val rowIterator = item.asInstanceOf[Row].toSeq.iterator
+        if (honorSchemaMismatch) {

Review comment:
       +1 

##########
File path: 
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java
##########
@@ -50,8 +50,8 @@
         
.load(JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq());
 
     return HoodieSparkUtils
-        .createRdd(dataSet.toDF(), 
structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
-            nameSpace.orElse(RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE))
+        .createRdd(dataSet.toDF(), null, 
structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),

Review comment:
       please dont use null as a sentinel anywhere




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to