tandonraghav opened a new issue #2919:
URL: https://github.com/apache/hudi/issues/2919


   I am facing issue in Schema Evolution. While adding a new field to the Spark 
DF, it is giving exception if there are previous Log files/Records which do not 
have that field.
   
   I can see *type* is reversed in *test* and there is no default value(In 
Hoodie Log files). Is it because of the SchemaConvertors? 
   
   **Environment Description**
   
   * Hudi version : 0.8.0
   
   * Spark version : 2.4
   
   * Hive version : 
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) :  HDFS
   
   * Running on Docker? (yes/no) : no
   
   Code 
   
   1. Create the DF
   
   ````
   Dataset<GenericRecord> genericRecordDfFromKafka = db.map((MapFunction<Row, 
GenericRecord>) row -> {
                                       String docId=((GenericRowWithSchema) 
row.getAs("json_value")).getAs("doc_id");
                                       String dbName = ((GenericRowWithSchema) 
row.getAs("json_value")).getAs("collection");
                                       String before = ((GenericRowWithSchema) 
row.getAs("json_value")).getAs("o");
                                       String after = ((GenericRowWithSchema) 
row.getAs("json_value")).getAs("o2");
                                       String op = ((GenericRowWithSchema) 
row.getAs("json_value")).getAs("op");
                                       Long ts = (((GenericRowWithSchema) 
row.getAs("json_value")).getStruct(5).getStruct(0).getAs("t"));
                                       return convertOplogToAvro(dbName, 
before, after, ts,docId,op,schemaStr);
                                   }, Encoders.bean(GenericRecord.class));
                                   
    Dataset<Row> ds = 
AvroConversionUtils.createDataFrame(genericRecordDfFromKafka.rdd(),
                                           schemaStr, sparkSession);            
                   
   
    Dataset<Row> insertedDs=ds.select("*").where(ds.col("op").notEqual("d"));
    persistDFInHudi(insertedDs, db_name, tablePath,hiveUrl);
    
    private void persistDFInHudi(Dataset<Row> ds, String dbName, String 
tablePath, String hiveUrl) {
           ds.write()
                   .format("org.apache.hudi")
                   .options(QuickstartUtils.getQuickstartWriteConfigs())
                   .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL())
                   .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL())
                   .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), 
"ts_ms")
                   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"id")
                   
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "db_name")
                   .option(HoodieWriteConfig.TABLE_NAME, dbName)
                   .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, true)
                   .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), 
"true")
                   .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), dbName)
                   
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "db_name")
                   
.option(DataSourceWriteOptions.DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL(), 
"false")
                   
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, 100)
                   
.option(HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS_PROP, 2000)
                   
.option(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY_PROP,
                           
String.valueOf(CompactionTriggerStrategy.NUM_OR_TIME))
                   .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), 
"some_db").
                   option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), hiveUrl)
                   
/*.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
                           NonPartitionedExtractor.class.getName())*/
                   
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), true)
                   
/*.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(),
                           NonpartitionedKeyGenerator.class.getName())*/
                   
/*.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
                           NonPartitionedExtractor.class.getName())*/
                   
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
                           MultiPartKeysValueExtractor.class.getName())
                   .option(HoodieIndexConfig.INDEX_TYPE_PROP, 
HoodieIndex.IndexType.SIMPLE.toString())
                   .mode(SaveMode.Append)
                   .save(tablePath);
       }
       
       private GenericRecord convertOplogToAvro(String dbName, String before, 
String after, Long ts, String id, String op,
                                                String schemaStr) throws 
IOException, RestClientException {
           Schema schema = new Schema.Parser().parse(schemaStr);
           Document dataTypeDoc=new Document();
           if(before!=null){
               Document document = Document.parse(before);
               dataTypeDoc = Utils.convertToAvroDoc(document);
           }
           dataTypeDoc.put("ts_ms", ts);
           dataTypeDoc.put("db_name", dbName);
           dataTypeDoc.put("id", id);
           dataTypeDoc.put("op",op);
   
           Document finalDataTypeDoc = dataTypeDoc;
           Stream.of(DefaultFields.values())
                   .forEach(field -> finalDataTypeDoc.put(field.getName(), 
field.getValueFrom(finalDataTypeDoc)));
   
           return new 
MercifulJsonConverter().convert(dataTypeDoc.toJson(JsonWriterSettings.builder()
                   .outputMode(JsonMode.RELAXED)
                   .dateTimeConverter(new JsonDateConvertor())
                   .objectIdConverter(new ObjectIdConverter())
                   .build()), schema);
       }
    
    ````
   
   2. Persist this ds with Original Schema first and call it few times, to make 
sure some uncompacted Log files are there.
   3. Persist this ds again with New schema and it will throw Error 
   **Caused by: org.apache.avro.AvroTypeException: Found 
hoodie.products.products_record, expecting hoodie.products.products_record, 
missing required field test2**
   
   Our schema is dynamic and I am not removing any field rather adding a field 
to the end, then also it is failing.
   
   Original Schema 
   
   ````
   {
     "type": "record",
     "name": "foo",
     "namespace": "products",
     "fields": [
       {
         "name": "id",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "product_id",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "db_name",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "catalog_id",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "feed_id",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "ts_ms",
         "type": [
           "null",
           "double"
         ],
         "default": null
       },
       {
         "name": "op",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "test",
         "type": [
           "null",
           "string"
         ],
         "default": null
       }
     ]
   }
   ````
   
   Changed Schema
   
   ````
   {
     "type": "record",
     "name": "foo",
     "namespace": "products",
     "fields": [
       {
         "name": "id",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "product_id",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "db_name",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "catalog_id",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "feed_id",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "ts_ms",
         "type": [
           "null",
           "double"
         ],
         "default": null
       },
       {
         "name": "op",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "test",
         "type": [
           "null",
           "string"
         ],
         "default": null
       },
       {
         "name": "test2",
         "type": [
           "null",
           "string"
         ],
         "default": null
       }
     ]
   }
   ````
   
   Added a column **test2** in the end with default value.
    
   
   
   Schema shown by Hoodie in Logs
   
   ```{
     "type": "record",
     "name": "Max_IND_record",
     "namespace": "hoodie.Max_IND",
     "fields": [
       {
         "name": "_hoodie_commit_time",
         "type": [
           "null",
           "string"
         ],
         "doc": "",
         "default": null
       },
       {
         "name": "_hoodie_commit_seqno",
         "type": [
           "null",
           "string"
         ],
         "doc": "",
         "default": null
       },
       {
         "name": "_hoodie_record_key",
         "type": [
           "null",
           "string"
         ],
         "doc": "",
         "default": null
       },
       {
         "name": "_hoodie_partition_path",
         "type": [
           "null",
           "string"
         ],
         "doc": "",
         "default": null
       },
       {
         "name": "_hoodie_file_name",
         "type": [
           "null",
           "string"
         ],
         "doc": "",
         "default": null
       },
       {
         "name": "id",
         "type": [
           "string",
           "null"
         ]
       },
       {
         "name": "product_id",
         "type": [
           "string",
           "null"
         ]
       },
       {
         "name": "db_name",
         "type": [
           "string",
           "null"
         ]
       },
       {
         "name": "catalog_id",
         "type": [
           "string",
           "null"
         ]
       },
       {
         "name": "feed_id",
         "type": [
           "string",
           "null"
         ]
       },
       {
         "name": "ts_ms",
         "type": [
           "double",
           "null"
         ]
       },
       {
         "name": "op",
         "type": [
           "string",
           "null"
         ]
       },
       {
         "name": "test",
         "type": [
           "string",
           "null"
         ]
       }
     ]
   }
   ````
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to