vinothchandar commented on a change in pull request #2927:
URL: https://github.com/apache/hudi/pull/2927#discussion_r638213323
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -62,11 +65,16 @@ public TableSchemaResolver(HoodieTableMetaClient
metaClient) {
}
/**
- * Gets the schema for a hoodie table. Depending on the type of table, read
from any file written in the latest
- * commit. We will assume that the schema has not changed within a single
atomic write.
+ * @return {@code true} if any commits are found, else {@code false}.
+ */
+ private boolean isTimelineNonEmpty() {
Review comment:
hmmm. does it really go here? We should only add methods that are
relevant for this class i.e schema resolution here.
##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -110,6 +110,20 @@ public static FileSystem getFs(String path, Configuration
conf, boolean localByD
return getFs(path, conf);
}
+ /**
+ * Check if table already exists in the given path.
+ * @param path base path of the table.
+ * @param fs instance of {@link FileSystem}.
+ * @return {@code true} if table exists. {@code false} otherwise.
+ */
+ public static boolean isTableExists(String path, FileSystem fs) {
Review comment:
are we sure there is n't an existing method for this?
Also should n't we rethrow the IOException, rather than assuming its false.
What if there is a network error, that made `exists()` fail. but the table
actually existed
##########
File path:
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -343,25 +350,67 @@ object AvroConversionHelper {
case structType: StructType =>
val schema: Schema = convertStructTypeToAvroSchema(structType,
structName, recordNamespace)
val childNameSpace = if (recordNamespace != "")
s"$recordNamespace.$structName" else structName
- val fieldConverters = structType.fields.map(field =>
- createConverterToAvro(
- field.dataType,
- field.name,
- childNameSpace))
- (item: Any) => {
- if (item == null) {
- null
- } else {
- val record = new Record(schema)
- val convertersIterator = fieldConverters.iterator
- val fieldNamesIterator =
dataType.asInstanceOf[StructType].fieldNames.iterator
- val rowIterator = item.asInstanceOf[Row].toSeq.iterator
+ if (honorSchemaMismatch) {
Review comment:
can you add some context on what exactly is the fix here?
##########
File path:
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -343,25 +350,67 @@ object AvroConversionHelper {
case structType: StructType =>
val schema: Schema = convertStructTypeToAvroSchema(structType,
structName, recordNamespace)
val childNameSpace = if (recordNamespace != "")
s"$recordNamespace.$structName" else structName
- val fieldConverters = structType.fields.map(field =>
- createConverterToAvro(
- field.dataType,
- field.name,
- childNameSpace))
- (item: Any) => {
- if (item == null) {
- null
- } else {
- val record = new Record(schema)
- val convertersIterator = fieldConverters.iterator
- val fieldNamesIterator =
dataType.asInstanceOf[StructType].fieldNames.iterator
- val rowIterator = item.asInstanceOf[Row].toSeq.iterator
+ if (honorSchemaMismatch) {
Review comment:
let's pull this into its own method?. lets please keep our guidelines in
mind :)
https://hudi.apache.org/contributing#coding-guidelines
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -353,6 +361,39 @@ public static boolean isSchemaCompatible(String oldSchema,
String newSchema) {
return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new
Schema.Parser().parse(newSchema));
}
+ /**
+ * Get latest schema either from incoming schema or table schema.
+ * @param incomingSchema incoming batch's schema.
+ * @param convertTableSchemaToAddNamespace {@code true} if table schema
needs to be converted. {@code false} otherwise.
+ * @param converterFn converter function to be called over table schema. In
DeltaSync flow, table schema needs to convert
+ * from avro -> df -> avro to add the namespace in the schema. But in spark
writer flow, no such conversion is required.
+ * This package does not have access to some elements needed for conversion,
hence added it as function call rather than embedding here.
+ * @return the latest schema.
+ */
+ public Schema getLatestSchema(Schema incomingSchema, boolean
convertTableSchemaToAddNamespace,
Review comment:
can we standardize on writeSchema and tableSchema? instead of
introducing new terminlogy like `incoming` ?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -353,6 +361,39 @@ public static boolean isSchemaCompatible(String oldSchema,
String newSchema) {
return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new
Schema.Parser().parse(newSchema));
}
+ /**
+ * Get latest schema either from incoming schema or table schema.
+ * @param incomingSchema incoming batch's schema.
+ * @param convertTableSchemaToAddNamespace {@code true} if table schema
needs to be converted. {@code false} otherwise.
+ * @param converterFn converter function to be called over table schema. In
DeltaSync flow, table schema needs to convert
Review comment:
please avoid any references to layers of code so far up, from code thats
common to everything like hudi-common
##########
File path:
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -343,25 +350,67 @@ object AvroConversionHelper {
case structType: StructType =>
val schema: Schema = convertStructTypeToAvroSchema(structType,
structName, recordNamespace)
val childNameSpace = if (recordNamespace != "")
s"$recordNamespace.$structName" else structName
- val fieldConverters = structType.fields.map(field =>
- createConverterToAvro(
- field.dataType,
- field.name,
- childNameSpace))
- (item: Any) => {
- if (item == null) {
- null
- } else {
- val record = new Record(schema)
- val convertersIterator = fieldConverters.iterator
- val fieldNamesIterator =
dataType.asInstanceOf[StructType].fieldNames.iterator
- val rowIterator = item.asInstanceOf[Row].toSeq.iterator
+ if (honorSchemaMismatch) {
Review comment:
+1
##########
File path:
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/SparkBasedReader.java
##########
@@ -50,8 +50,8 @@
.load(JavaConverters.asScalaIteratorConverter(listOfPaths.iterator()).asScala().toSeq());
return HoodieSparkUtils
- .createRdd(dataSet.toDF(),
structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
- nameSpace.orElse(RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE))
+ .createRdd(dataSet.toDF(), null,
structName.orElse(RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME),
Review comment:
please dont use null as a sentinel anywhere
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]