This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e7ce19  Read and apply schema for each log block from the metadata 
header instead of the latest schema
9e7ce19 is described below

commit 9e7ce19b06c5b6235857e22dc039bb726b61f8ca
Author: Nishith Agarwal <[email protected]>
AuthorDate: Tue Apr 16 12:47:09 2019 -0700

    Read and apply schema for each log block from the metadata header instead 
of the latest schema
---
 .../common/table/log/HoodieLogFileReader.java      |  9 ++++-
 .../common/table/log/HoodieLogFormatTest.java      | 45 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 2 deletions(-)

diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
index 836870e..878457f 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
@@ -197,12 +197,17 @@ class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
     switch (blockType) {
       // based on type read the block
       case AVRO_DATA_BLOCK:
+        Schema readerSchemaForBlock = readerSchema;
+        if (header != null) {
+          String schema = header.get(HeaderMetadataType.SCHEMA);
+          readerSchemaForBlock = schema != null ? new 
Schema.Parser().parse(schema) : readerSchema;
+        }
         if (nextBlockVersion.getVersion() == 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-          return HoodieAvroDataBlock.getBlock(content, readerSchema);
+          return HoodieAvroDataBlock.getBlock(content, readerSchemaForBlock);
         } else {
           return HoodieAvroDataBlock
               .getBlock(logFile, inputStream, Optional.ofNullable(content), 
readBlockLazily,
-                  contentPosition, contentLength, blockEndPos, readerSchema, 
header, footer);
+                  contentPosition, contentLength, blockEndPos, 
readerSchemaForBlock, header, footer);
         }
       case DELETE_BLOCK:
         return HoodieDeleteBlock
diff --git 
a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java
 
b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java
index cc795ff..80b1d58 100644
--- 
a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java
+++ 
b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java
@@ -16,6 +16,7 @@
 
 package com.uber.hoodie.common.table.log;
 
+import static com.uber.hoodie.common.util.SchemaTestUtil.getEvolvedSchema;
 import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -1321,4 +1322,48 @@ public class HoodieLogFormatTest {
     assertFalse(reader.hasPrev());
     reader.close();
   }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testAppendWithSchemaEvolution() throws IOException, 
URISyntaxException, InterruptedException {
+    Writer writer = 
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+        
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
+        .overBaseCommit("100").withFs(fs).build();
+    Schema schema = getSimpleSchema();
+    List<IndexedRecord> records1 = SchemaTestUtil.generateTestRecords(0, 100);
+    List<IndexedRecord> copyOfRecords1 = records1.stream().map(
+        record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(copyOfRecords1, 
header);
+    writer = writer.appendBlock(dataBlock);
+    writer.close();
+
+    Schema evolvedSchema = getEvolvedSchema();
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
evolvedSchema.toString());
+    writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+        
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1").overBaseCommit("100")
+        .withFs(fs).build();
+    List<IndexedRecord> records2 = 
SchemaTestUtil.generateEvolvedTestRecords(0, 100);
+    List<IndexedRecord> copyOfRecords2 = records2.stream().map(
+        record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
evolvedSchema)).collect(Collectors.toList());
+    dataBlock = new HoodieAvroDataBlock(copyOfRecords2, header);
+    writer = writer.appendBlock(dataBlock);
+    writer.close();
+
+    // Pass the evolved schema as the latest schema
+    HoodieLogFileReader reader = new HoodieLogFileReader(fs, 
writer.getLogFile(), SchemaTestUtil.getEvolvedSchema(),
+        bufferSize, readBlocksLazily, true);
+
+    assertTrue("First block should be available", reader.hasNext());
+    // Read the 100 records from the first log block
+    assertEquals(((HoodieAvroDataBlock) reader.next()).getRecords().size(), 
100);
+
+    assertTrue("Second block should be available", reader.hasNext());
+    // Read the 100 records from the second log block
+    assertEquals(((HoodieAvroDataBlock) reader.next()).getRecords().size(), 
100);
+
+    reader.close();
+  }
 }

Reply via email to