tandonraghav opened a new issue #2919:
URL: https://github.com/apache/hudi/issues/2919
I am facing issue in Schema Evolution. While adding a new field to the Spark
DF, it is giving exception if there are previous Log files/Records which do not
have that field.
I can see *type* is reversed in *test* and there is no default value(In
Hoodie Log files). Is it because of the SchemaConvertors?
**Environment Description**
* Hudi version : 0.8.0
* Spark version : 2.4
* Hive version :
* Hadoop version :
* Storage (HDFS/S3/GCS..) : HDFS
* Running on Docker? (yes/no) : no
Code
1. Create the DF
````
Dataset<GenericRecord> genericRecordDfFromKafka = db.map((MapFunction<Row,
GenericRecord>) row -> {
String docId=((GenericRowWithSchema)
row.getAs("json_value")).getAs("doc_id");
String dbName = ((GenericRowWithSchema)
row.getAs("json_value")).getAs("collection");
String before = ((GenericRowWithSchema)
row.getAs("json_value")).getAs("o");
String after = ((GenericRowWithSchema)
row.getAs("json_value")).getAs("o2");
String op = ((GenericRowWithSchema)
row.getAs("json_value")).getAs("op");
Long ts = (((GenericRowWithSchema)
row.getAs("json_value")).getStruct(5).getStruct(0).getAs("t"));
return convertOplogToAvro(dbName,
before, after, ts,docId,op,schemaStr);
}, Encoders.bean(GenericRecord.class));
Dataset<Row> ds =
AvroConversionUtils.createDataFrame(genericRecordDfFromKafka.rdd(),
schemaStr, sparkSession);
Dataset<Row> insertedDs=ds.select("*").where(ds.col("op").notEqual("d"));
persistDFInHudi(insertedDs, db_name, tablePath,hiveUrl);
private void persistDFInHudi(Dataset<Row> ds, String dbName, String
tablePath, String hiveUrl) {
ds.write()
.format("org.apache.hudi")
.options(QuickstartUtils.getQuickstartWriteConfigs())
.option(DataSourceWriteOptions.OPERATION_OPT_KEY(),
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL())
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(),
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL())
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(),
"ts_ms")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),
"id")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "db_name")
.option(HoodieWriteConfig.TABLE_NAME, dbName)
.option(HoodieCompactionConfig.INLINE_COMPACT_PROP, true)
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(),
"true")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), dbName)
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "db_name")
.option(DataSourceWriteOptions.DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL(),
"false")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, 100)
.option(HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS_PROP, 2000)
.option(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY_PROP,
String.valueOf(CompactionTriggerStrategy.NUM_OR_TIME))
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(),
"some_db").
option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveUrl)
/*.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
NonPartitionedExtractor.class.getName())*/
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), true)
/*.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(),
NonpartitionedKeyGenerator.class.getName())*/
/*.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
NonPartitionedExtractor.class.getName())*/
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
MultiPartKeysValueExtractor.class.getName())
.option(HoodieIndexConfig.INDEX_TYPE_PROP,
HoodieIndex.IndexType.SIMPLE.toString())
.mode(SaveMode.Append)
.save(tablePath);
}
private GenericRecord convertOplogToAvro(String dbName, String before,
String after, Long ts, String id, String op,
String schemaStr) throws
IOException, RestClientException {
Schema schema = new Schema.Parser().parse(schemaStr);
Document dataTypeDoc=new Document();
if(before!=null){
Document document = Document.parse(before);
dataTypeDoc = Utils.convertToAvroDoc(document);
}
dataTypeDoc.put("ts_ms", ts);
dataTypeDoc.put("db_name", dbName);
dataTypeDoc.put("id", id);
dataTypeDoc.put("op",op);
Document finalDataTypeDoc = dataTypeDoc;
Stream.of(DefaultFields.values())
.forEach(field -> finalDataTypeDoc.put(field.getName(),
field.getValueFrom(finalDataTypeDoc)));
return new
MercifulJsonConverter().convert(dataTypeDoc.toJson(JsonWriterSettings.builder()
.outputMode(JsonMode.RELAXED)
.dateTimeConverter(new JsonDateConvertor())
.objectIdConverter(new ObjectIdConverter())
.build()), schema);
}
````
2. Persist this ds with Original Schema first and call it few times, to make
sure some uncompacted Log files are there.
3. Persist this ds again with New schema and it will throw Error
**Caused by: org.apache.avro.AvroTypeException: Found
hoodie.products.products_record, expecting hoodie.products.products_record,
missing required field test2**
Our schema is dynamic and I am not removing any field rather adding a field
to the end, then also it is failing.
Original Schema
````
{
"type": "record",
"name": "foo",
"namespace": "products",
"fields": [
{
"name": "id",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "product_id",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "db_name",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "catalog_id",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "feed_id",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ts_ms",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "op",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "test",
"type": [
"null",
"string"
],
"default": null
}
]
}
````
Changed Schema
````
{
"type": "record",
"name": "foo",
"namespace": "products",
"fields": [
{
"name": "id",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "product_id",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "db_name",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "catalog_id",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "feed_id",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ts_ms",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "op",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "test",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "test2",
"type": [
"null",
"string"
],
"default": null
}
]
}
````
Added a column **test2** in the end with default value.
Schema shown by Hoodie in Logs
```{
"type": "record",
"name": "Max_IND_record",
"namespace": "hoodie.Max_IND",
"fields": [
{
"name": "_hoodie_commit_time",
"type": [
"null",
"string"
],
"doc": "",
"default": null
},
{
"name": "_hoodie_commit_seqno",
"type": [
"null",
"string"
],
"doc": "",
"default": null
},
{
"name": "_hoodie_record_key",
"type": [
"null",
"string"
],
"doc": "",
"default": null
},
{
"name": "_hoodie_partition_path",
"type": [
"null",
"string"
],
"doc": "",
"default": null
},
{
"name": "_hoodie_file_name",
"type": [
"null",
"string"
],
"doc": "",
"default": null
},
{
"name": "id",
"type": [
"string",
"null"
]
},
{
"name": "product_id",
"type": [
"string",
"null"
]
},
{
"name": "db_name",
"type": [
"string",
"null"
]
},
{
"name": "catalog_id",
"type": [
"string",
"null"
]
},
{
"name": "feed_id",
"type": [
"string",
"null"
]
},
{
"name": "ts_ms",
"type": [
"double",
"null"
]
},
{
"name": "op",
"type": [
"string",
"null"
]
},
{
"name": "test",
"type": [
"string",
"null"
]
}
]
}
````
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]