This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d54b4b8  [HUDI-838] Support schema from HoodieCommitMetadata for 
HiveSync (#1559)
d54b4b8 is described below

commit d54b4b8a525868ea6d15e2e2cc6ffccc62d5c43c
Author: Udit Mehrotra <[email protected]>
AuthorDate: Thu May 7 16:33:09 2020 -0700

    [HUDI-838] Support schema from HoodieCommitMetadata for HiveSync (#1559)
    
    Co-authored-by: Mehrotra <[email protected]>
---
 .../java/org/apache/hudi/table/HoodieTable.java    |   2 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  10 ++
 .../hudi/common/table/TableSchemaResolver.java     |  64 ++++++++++--
 .../org/apache/hudi/hive/HoodieHiveClient.java     |   7 +-
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 110 +++++++++++++++------
 .../test/java/org/apache/hudi/hive/TestUtil.java   |  63 +++++++++---
 6 files changed, 199 insertions(+), 57 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 9904411..62509e4 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -535,7 +535,7 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload> implements Seri
     try {
       TableSchemaResolver schemaUtil = new 
TableSchemaResolver(getMetaClient());
       writerSchema = 
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
-      tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata());
+      tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields());
       isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, 
writerSchema);
     } catch (Exception e) {
       throw new HoodieException("Failed to read schema/check compatibility for 
base path " + metaClient.getBasePath(), e);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index c3a9d96..d56b7d9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -171,6 +171,16 @@ public class HoodieAvroUtils {
     return mergedSchema;
   }
 
+  public static Schema removeMetadataFields(Schema schema) {
+    List<Schema.Field> filteredFields = schema.getFields()
+                                              .stream()
+                                              .filter(field -> 
!HoodieRecord.HOODIE_META_COLUMNS.contains(field.name()))
+                                              .collect(Collectors.toList());
+    Schema filteredSchema = Schema.createRecord(schema.getName(), 
schema.getDoc(), schema.getNamespace(), false);
+    filteredSchema.setFields(filteredFields);
+    return filteredSchema;
+  }
+
   public static String addMetadataColumnTypes(String hiveColumnTypes) {
     return "string,string,string,string,string," + hiveColumnTypes;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 0bab862..129f85f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -25,6 +25,7 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaCompatibility;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -36,6 +37,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.InvalidTableException;
@@ -66,7 +68,7 @@ public class TableSchemaResolver {
    * @return Parquet schema for this table
    * @throws Exception
    */
-  public MessageType getDataSchema() throws Exception {
+  private MessageType getTableParquetSchemaFromDataFile() throws Exception {
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
 
     try {
@@ -139,29 +141,66 @@ public class TableSchemaResolver {
     }
   }
 
+  private Schema getTableAvroSchemaFromDataFile() throws Exception {
+    return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile());
+  }
+
   /**
-   * Gets the schema for a hoodie table in Avro format.
+   * Gets full schema (user + metadata) for a hoodie table in Avro format.
    *
    * @return Avro schema for this table
    * @throws Exception
    */
-  public Schema getTableSchema() throws Exception {
-    return convertParquetSchemaToAvro(getDataSchema());
+  public Schema getTableAvroSchema() throws Exception {
+    Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(true);
+    return schemaFromCommitMetadata.isPresent() ? 
schemaFromCommitMetadata.get() : getTableAvroSchemaFromDataFile();
+  }
+
+  /**
+   * Gets full schema (user + metadata) for a hoodie table in Parquet format.
+   *
+   * @return Parquet schema for the table
+   * @throws Exception
+   */
+  public MessageType getTableParquetSchema() throws Exception {
+    Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(true);
+    return schemaFromCommitMetadata.isPresent() ? 
convertAvroSchemaToParquet(schemaFromCommitMetadata.get()) :
+           getTableParquetSchemaFromDataFile();
+  }
+
+  /**
+   * Gets users data schema for a hoodie table in Avro format.
+   *
+   * @return  Avro user data schema
+   * @throws Exception
+   */
+  public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
+    Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(false);
+    return schemaFromCommitMetadata.isPresent() ? 
schemaFromCommitMetadata.get() :
+           
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
   }
 
   /**
    * Gets the schema for a hoodie table in Avro format from the 
HoodieCommitMetadata of the last commit.
    *
    * @return Avro schema for this table
-   * @throws Exception
    */
-  public Schema getTableSchemaFromCommitMetadata() throws Exception {
+  private Option<Schema> getTableSchemaFromCommitMetadata(boolean 
includeMetadataFields) {
     try {
       HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
       byte[] data = 
timeline.getInstantDetails(timeline.lastInstant().get()).get();
       HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, 
HoodieCommitMetadata.class);
       String existingSchemaStr = 
metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
-      return new Schema.Parser().parse(existingSchemaStr);
+
+      if (StringUtils.isNullOrEmpty(existingSchemaStr)) {
+        return Option.empty();
+      }
+
+      Schema schema = new Schema.Parser().parse(existingSchemaStr);
+      if (includeMetadataFields) {
+        schema = HoodieAvroUtils.addMetadataFields(schema);
+      }
+      return Option.of(schema);
     } catch (Exception e) {
       throw new HoodieException("Failed to read schema from commit metadata", 
e);
     }
@@ -179,6 +218,17 @@ public class TableSchemaResolver {
   }
 
   /**
+   * Convert a avro scheme to the parquet format.
+   *
+   * @param schema The avro schema to convert
+   * @return The converted parquet schema
+   */
+  public MessageType convertAvroSchemaToParquet(Schema schema) {
+    AvroSchemaConverter avroSchemaConverter = new 
AvroSchemaConverter(metaClient.getHadoopConf());
+    return avroSchemaConverter.convert(schema);
+  }
+
+  /**
    * HUDI specific validation of schema evolution. Ensures that a newer schema 
can be used for the dataset by
    * checking if the data written using the old schema can be read using the 
new schema.
    *
diff --git 
a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java 
b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 9f1a040..f1034e3 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -328,14 +328,15 @@ public class HoodieHiveClient {
   }
 
   /**
-   * Gets the schema for a hoodie table. Depending on the type of table, read 
from any file written in the latest
-   * commit. We will assume that the schema has not changed within a single 
atomic write.
+   * Gets the schema for a hoodie table. Depending on the type of table, try 
to read schema from commit metadata if
+   * present, else fallback to reading from any file written in the latest 
commit. We will assume that the schema has
+   * not changed within a single atomic write.
    *
    * @return Parquet schema for this table
    */
   public MessageType getDataSchema() {
     try {
-      return new TableSchemaResolver(metaClient).getDataSchema();
+      return new TableSchemaResolver(metaClient).getTableParquetSchema();
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to read data schema", e);
     }
diff --git 
a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java 
b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 14dfada..a883757 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.hive;
 
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SchemaTestUtil;
 import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
@@ -52,6 +53,10 @@ public class TestHiveSyncTool {
     return Stream.of(false, true);
   }
 
+  private static Iterable<Object[]> useJdbcAndSchemaFromCommitMetadata() {
+    return Arrays.asList(new Object[][] { { true, true }, { true, false }, { 
false, true }, { false, false } });
+  }
+
   @BeforeEach
   public void setUp() throws IOException, InterruptedException {
     TestUtil.setUp();
@@ -146,11 +151,11 @@ public class TestHiveSyncTool {
   }
 
   @ParameterizedTest
-  @MethodSource("useJdbc")
-  public void testBasicSync(boolean useJdbc) throws Exception {
+  @MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
+  public void testBasicSync(boolean useJdbc, boolean 
useSchemaFromCommitMetadata) throws Exception {
     TestUtil.hiveSyncConfig.useJdbc = useJdbc;
     String instantTime = "100";
-    TestUtil.createCOWTable(instantTime, 5);
+    TestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
     HoodieHiveClient hiveClient =
         new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
     assertFalse(hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName),
@@ -214,7 +219,7 @@ public class TestHiveSyncTool {
   public void testSyncIncremental(boolean useJdbc) throws Exception {
     TestUtil.hiveSyncConfig.useJdbc = useJdbc;
     String commitTime1 = "100";
-    TestUtil.createCOWTable(commitTime1, 5);
+    TestUtil.createCOWTable(commitTime1, 5, true);
     HoodieHiveClient hiveClient =
         new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
     // Lets do the sync
@@ -228,7 +233,7 @@ public class TestHiveSyncTool {
     // Now lets create more parititions and these are the only ones which 
needs to be synced
     DateTime dateTime = DateTime.now().plusDays(6);
     String commitTime2 = "101";
-    TestUtil.addCOWPartitions(1, true, dateTime, commitTime2);
+    TestUtil.addCOWPartitions(1, true, true, dateTime, commitTime2);
 
     // Lets do the sync
     hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, 
TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -253,7 +258,7 @@ public class TestHiveSyncTool {
   public void testSyncIncrementalWithSchemaEvolution(boolean useJdbc) throws 
Exception {
     TestUtil.hiveSyncConfig.useJdbc = useJdbc;
     String commitTime1 = "100";
-    TestUtil.createCOWTable(commitTime1, 5);
+    TestUtil.createCOWTable(commitTime1, 5, true);
     HoodieHiveClient hiveClient =
         new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
     // Lets do the sync
@@ -265,7 +270,7 @@ public class TestHiveSyncTool {
     // Now lets create more parititions and these are the only ones which 
needs to be synced
     DateTime dateTime = DateTime.now().plusDays(6);
     String commitTime2 = "101";
-    TestUtil.addCOWPartitions(1, false, dateTime, commitTime2);
+    TestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
 
     // Lets do the sync
     tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
@@ -286,12 +291,13 @@ public class TestHiveSyncTool {
   }
 
   @ParameterizedTest
-  @MethodSource("useJdbc")
-  public void testSyncMergeOnRead(boolean useJdbc) throws Exception {
+  @MethodSource("useJdbcAndSchemaFromCommitMetadata")
+  public void testSyncMergeOnRead(boolean useJdbc, boolean 
useSchemaFromCommitMetadata) throws Exception {
     TestUtil.hiveSyncConfig.useJdbc = useJdbc;
     String instantTime = "100";
     String deltaCommitTime = "101";
-    TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true);
+    TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
+                            useSchemaFromCommitMetadata);
 
     String roTableName = TestUtil.hiveSyncConfig.tableName + 
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
     HoodieHiveClient hiveClient = new 
HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
@@ -301,8 +307,19 @@ public class TestHiveSyncTool {
     tool.syncHoodieTable();
 
     assertTrue(hiveClient.doesTableExist(roTableName), "Table " + roTableName 
+ " should exist after sync completes");
-    assertEquals(hiveClient.getTableSchema(roTableName).size(), 
SchemaTestUtil.getSimpleSchema().getFields().size() + 1,
-        "Hive Schema should match the table schema + partition field");
+
+    if (useSchemaFromCommitMetadata) {
+      assertEquals(hiveClient.getTableSchema(roTableName).size(),
+                   SchemaTestUtil.getSimpleSchema().getFields().size() + 
TestUtil.hiveSyncConfig.partitionFields.size()
+                     + HoodieRecord.HOODIE_META_COLUMNS.size(),
+                   "Hive Schema should match the table schema + partition 
field");
+    } else {
+      // The data generated and schema in the data file do not have metadata 
columns, so we need a separate check.
+      assertEquals(hiveClient.getTableSchema(roTableName).size(),
+                   SchemaTestUtil.getSimpleSchema().getFields().size() + 
TestUtil.hiveSyncConfig.partitionFields.size(),
+                   "Hive Schema should match the table schema + partition 
field");
+    }
+
     assertEquals(5, hiveClient.scanTablePartitions(roTableName).size(),
         "Table partitions should match the number of partitions we wrote");
     assertEquals(deltaCommitTime, 
hiveClient.getLastCommitTimeSynced(roTableName).get(),
@@ -313,15 +330,25 @@ public class TestHiveSyncTool {
     String commitTime2 = "102";
     String deltaCommitTime2 = "103";
 
-    TestUtil.addCOWPartitions(1, true, dateTime, commitTime2);
-    TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, 
deltaCommitTime2);
+    TestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, 
commitTime2);
+    TestUtil.addMORPartitions(1, true, false,
+        useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2);
     // Lets do the sync
     tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
     tool.syncHoodieTable();
     hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, 
TestUtil.getHiveConf(), TestUtil.fileSystem);
 
-    assertEquals(hiveClient.getTableSchema(roTableName).size(), 
SchemaTestUtil.getEvolvedSchema().getFields().size() + 1,
-        "Hive Schema should match the evolved table schema + partition field");
+    if (useSchemaFromCommitMetadata) {
+      assertEquals(hiveClient.getTableSchema(roTableName).size(),
+                   SchemaTestUtil.getEvolvedSchema().getFields().size() + 
TestUtil.hiveSyncConfig.partitionFields.size()
+                     + HoodieRecord.HOODIE_META_COLUMNS.size(),
+                   "Hive Schema should match the evolved table schema + 
partition field");
+    } else {
+      // The data generated and schema in the data file do not have metadata 
columns, so we need a separate check.
+      assertEquals(hiveClient.getTableSchema(roTableName).size(),
+                   SchemaTestUtil.getEvolvedSchema().getFields().size() + 
TestUtil.hiveSyncConfig.partitionFields.size(),
+                   "Hive Schema should match the evolved table schema + 
partition field");
+    }
     // Sync should add the one partition
     assertEquals(6, hiveClient.scanTablePartitions(roTableName).size(),
         "The 2 partitions we wrote should be added to hive");
@@ -330,13 +357,13 @@ public class TestHiveSyncTool {
   }
 
   @ParameterizedTest
-  @MethodSource("useJdbc")
-  public void testSyncMergeOnReadRT(boolean useJdbc) throws Exception {
+  @MethodSource("useJdbcAndSchemaFromCommitMetadata")
+  public void testSyncMergeOnReadRT(boolean useJdbc, boolean 
useSchemaFromCommitMetadata) throws Exception {
     TestUtil.hiveSyncConfig.useJdbc = useJdbc;
     String instantTime = "100";
     String deltaCommitTime = "101";
     String snapshotTableName = TestUtil.hiveSyncConfig.tableName + 
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
-    TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true);
+    TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, 
useSchemaFromCommitMetadata);
     HoodieHiveClient hiveClientRT =
         new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
 
@@ -352,8 +379,18 @@ public class TestHiveSyncTool {
         "Table " + TestUtil.hiveSyncConfig.tableName + 
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
             + " should exist after sync completes");
 
-    assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), 
SchemaTestUtil.getSimpleSchema().getFields().size() + 1,
-        "Hive Schema should match the table schema + partition field");
+    if (useSchemaFromCommitMetadata) {
+      assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+                   SchemaTestUtil.getSimpleSchema().getFields().size() + 
TestUtil.hiveSyncConfig.partitionFields.size()
+                     + HoodieRecord.HOODIE_META_COLUMNS.size(),
+                   "Hive Schema should match the table schema + partition 
field");
+    } else {
+      // The data generated and schema in the data file do not have metadata 
columns, so we need a separate check.
+      assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+                   SchemaTestUtil.getSimpleSchema().getFields().size() + 
TestUtil.hiveSyncConfig.partitionFields.size(),
+                   "Hive Schema should match the table schema + partition 
field");
+    }
+
     assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(),
         "Table partitions should match the number of partitions we wrote");
     assertEquals(deltaCommitTime, 
hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get(),
@@ -364,15 +401,24 @@ public class TestHiveSyncTool {
     String commitTime2 = "102";
     String deltaCommitTime2 = "103";
 
-    TestUtil.addCOWPartitions(1, true, dateTime, commitTime2);
-    TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, 
deltaCommitTime2);
+    TestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, 
commitTime2);
+    TestUtil.addMORPartitions(1, true, false, useSchemaFromCommitMetadata, 
dateTime, commitTime2, deltaCommitTime2);
     // Lets do the sync
     tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
     tool.syncHoodieTable();
     hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, 
TestUtil.getHiveConf(), TestUtil.fileSystem);
 
-    assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), 
SchemaTestUtil.getEvolvedSchema().getFields().size() + 1,
-        "Hive Schema should match the evolved table schema + partition field");
+    if (useSchemaFromCommitMetadata) {
+      assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+                   SchemaTestUtil.getEvolvedSchema().getFields().size() + 
TestUtil.hiveSyncConfig.partitionFields.size()
+                     + HoodieRecord.HOODIE_META_COLUMNS.size(),
+                   "Hive Schema should match the evolved table schema + 
partition field");
+    } else {
+      // The data generated and schema in the data file do not have metadata 
columns, so we need a separate check.
+      assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+                   SchemaTestUtil.getEvolvedSchema().getFields().size() + 
TestUtil.hiveSyncConfig.partitionFields.size(),
+                   "Hive Schema should match the evolved table schema + 
partition field");
+    }
     // Sync should add the one partition
     assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(),
         "The 2 partitions we wrote should be added to hive");
@@ -385,7 +431,7 @@ public class TestHiveSyncTool {
   public void testMultiPartitionKeySync(boolean useJdbc) throws Exception {
     TestUtil.hiveSyncConfig.useJdbc = useJdbc;
     String instantTime = "100";
-    TestUtil.createCOWTable(instantTime, 5);
+    TestUtil.createCOWTable(instantTime, 5, true);
 
     HiveSyncConfig hiveSyncConfig = 
HiveSyncConfig.copy(TestUtil.hiveSyncConfig);
     hiveSyncConfig.partitionValueExtractorClass = 
MultiPartKeysValueExtractor.class.getCanonicalName();
@@ -416,7 +462,7 @@ public class TestHiveSyncTool {
     TestUtil.hiveSyncConfig.useJdbc = useJdbc;
     String commitTime = "100";
     String snapshotTableName = TestUtil.hiveSyncConfig.tableName + 
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
-    TestUtil.createMORTable(commitTime, "", 5, false);
+    TestUtil.createMORTable(commitTime, "", 5, false, true);
     HoodieHiveClient hiveClientRT =
         new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
 
@@ -431,7 +477,9 @@ public class TestHiveSyncTool {
         + " should exist after sync completes");
 
     // Schema being read from compacted base files
-    assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), 
SchemaTestUtil.getSimpleSchema().getFields().size() + 1,
+    assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+                 SchemaTestUtil.getSimpleSchema().getFields().size() + 
TestUtil.hiveSyncConfig.partitionFields.size()
+                   + HoodieRecord.HOODIE_META_COLUMNS.size(),
         "Hive Schema should match the table schema + partition field");
     assertEquals(5, 
hiveClientRT.scanTablePartitions(snapshotTableName).size(), "Table partitions 
should match the number of partitions we wrote");
 
@@ -440,14 +488,16 @@ public class TestHiveSyncTool {
     String commitTime2 = "102";
     String deltaCommitTime2 = "103";
 
-    TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, 
deltaCommitTime2);
+    TestUtil.addMORPartitions(1, true, false, true, dateTime, commitTime2, 
deltaCommitTime2);
     // Lets do the sync
     tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
     tool.syncHoodieTable();
     hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, 
TestUtil.getHiveConf(), TestUtil.fileSystem);
 
     // Schema being read from the log files
-    assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), 
SchemaTestUtil.getEvolvedSchema().getFields().size() + 1,
+    assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+                 SchemaTestUtil.getEvolvedSchema().getFields().size() + 
TestUtil.hiveSyncConfig.partitionFields.size()
+                   + HoodieRecord.HOODIE_META_COLUMNS.size(),
         "Hive Schema should match the evolved table schema + partition field");
     // Sync should add the one partition
     assertEquals(6, 
hiveClientRT.scanTablePartitions(snapshotTableName).size(), "The 1 partition we 
wrote should be added to hive");
diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java 
b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
index 86c78fb..960a010 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
@@ -155,7 +155,7 @@ public class TestUtil {
     }
   }
 
-  static void createCOWTable(String instantTime, int numberOfPartitions)
+  static void createCOWTable(String instantTime, int numberOfPartitions, 
boolean useSchemaFromCommitMetadata)
       throws IOException, URISyntaxException {
     Path path = new Path(hiveSyncConfig.basePath);
     FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
@@ -164,13 +164,15 @@ public class TestUtil {
     boolean result = fileSystem.mkdirs(path);
     checkResult(result);
     DateTime dateTime = DateTime.now();
-    HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, 
true, dateTime, instantTime);
+    HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, 
true,
+        useSchemaFromCommitMetadata, dateTime, instantTime);
     createdTablesSet.add(hiveSyncConfig.databaseName + "." + 
hiveSyncConfig.tableName);
     createCommitFile(commitMetadata, instantTime);
   }
 
   static void createMORTable(String commitTime, String deltaCommitTime, int 
numberOfPartitions,
-      boolean createDeltaCommit) throws IOException, URISyntaxException, 
InterruptedException {
+      boolean createDeltaCommit, boolean useSchemaFromCommitMetadata)
+      throws IOException, URISyntaxException, InterruptedException {
     Path path = new Path(hiveSyncConfig.basePath);
     FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
     HoodieTableMetaClient.initTableType(configuration, 
hiveSyncConfig.basePath, HoodieTableType.MERGE_ON_READ,
@@ -179,46 +181,54 @@ public class TestUtil {
     boolean result = fileSystem.mkdirs(path);
     checkResult(result);
     DateTime dateTime = DateTime.now();
-    HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, 
true, dateTime, commitTime);
+    HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, 
true,
+        useSchemaFromCommitMetadata, dateTime, commitTime);
     createdTablesSet.add(hiveSyncConfig.databaseName + "." + 
hiveSyncConfig.tableName);
     createdTablesSet
         .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + 
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
     HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
     commitMetadata.getPartitionToWriteStats()
         .forEach((key, value) -> value.forEach(l -> 
compactionMetadata.addWriteStat(key, l)));
+    addSchemaToCommitMetadata(compactionMetadata, 
commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
+                             useSchemaFromCommitMetadata);
     createCompactionCommitFile(compactionMetadata, commitTime);
     if (createDeltaCommit) {
       // Write a delta commit
-      HoodieCommitMetadata deltaMetadata = 
createLogFiles(commitMetadata.getPartitionToWriteStats(), true);
+      HoodieCommitMetadata deltaMetadata = 
createLogFiles(commitMetadata.getPartitionToWriteStats(), true,
+                                                          
useSchemaFromCommitMetadata);
       createDeltaCommitFile(deltaMetadata, deltaCommitTime);
     }
   }
 
-  static void addCOWPartitions(int numberOfPartitions, boolean 
isParquetSchemaSimple, DateTime startFrom,
-                               String instantTime) throws IOException, 
URISyntaxException {
+  static void addCOWPartitions(int numberOfPartitions, boolean 
isParquetSchemaSimple,
+      boolean useSchemaFromCommitMetadata, DateTime startFrom, String 
instantTime) throws IOException, URISyntaxException {
     HoodieCommitMetadata commitMetadata =
-        createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, 
instantTime);
+        createPartitions(numberOfPartitions, isParquetSchemaSimple, 
useSchemaFromCommitMetadata, startFrom, instantTime);
     createdTablesSet.add(hiveSyncConfig.databaseName + "." + 
hiveSyncConfig.tableName);
     createCommitFile(commitMetadata, instantTime);
   }
 
   static void addMORPartitions(int numberOfPartitions, boolean 
isParquetSchemaSimple, boolean isLogSchemaSimple,
-                               DateTime startFrom, String instantTime, String 
deltaCommitTime)
+      boolean useSchemaFromCommitMetadata, DateTime startFrom, String 
instantTime, String deltaCommitTime)
       throws IOException, URISyntaxException, InterruptedException {
-    HoodieCommitMetadata commitMetadata =
-        createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, 
instantTime);
+    HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, 
isParquetSchemaSimple,
+        useSchemaFromCommitMetadata, startFrom, instantTime);
     createdTablesSet.add(hiveSyncConfig.databaseName + "." + 
hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
     createdTablesSet.add(hiveSyncConfig.databaseName + "." + 
hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
     HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
     commitMetadata.getPartitionToWriteStats()
         .forEach((key, value) -> value.forEach(l -> 
compactionMetadata.addWriteStat(key, l)));
+    addSchemaToCommitMetadata(compactionMetadata, 
commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
+                             useSchemaFromCommitMetadata);
     createCompactionCommitFile(compactionMetadata, instantTime);
-    HoodieCommitMetadata deltaMetadata = 
createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple);
+    HoodieCommitMetadata deltaMetadata = 
createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple,
+        useSchemaFromCommitMetadata);
     createDeltaCommitFile(deltaMetadata, deltaCommitTime);
   }
 
   private static HoodieCommitMetadata createLogFiles(Map<String, 
List<HoodieWriteStat>> partitionWriteStats,
-                                                     boolean 
isLogSchemaSimple) throws InterruptedException, IOException, URISyntaxException 
{
+      boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata)
+      throws InterruptedException, IOException, URISyntaxException {
     HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
     for (Entry<String, List<HoodieWriteStat>> wEntry : 
partitionWriteStats.entrySet()) {
       String partitionPath = wEntry.getKey();
@@ -232,11 +242,12 @@ public class TestUtil {
         commitMetadata.addWriteStat(partitionPath, writeStat);
       }
     }
+    addSchemaToCommitMetadata(commitMetadata, isLogSchemaSimple, 
useSchemaFromCommitMetadata);
     return commitMetadata;
   }
 
   private static HoodieCommitMetadata createPartitions(int numberOfPartitions, 
boolean isParquetSchemaSimple,
-                                                       DateTime startFrom, 
String instantTime) throws IOException, URISyntaxException {
+      boolean useSchemaFromCommitMetadata, DateTime startFrom, String 
instantTime) throws IOException, URISyntaxException {
     startFrom = startFrom.withTimeAtStartOfDay();
 
     HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
@@ -249,6 +260,7 @@ public class TestUtil {
       startFrom = startFrom.minusDays(1);
       writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
     }
+    addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple, 
useSchemaFromCommitMetadata);
     return commitMetadata;
   }
 
@@ -271,7 +283,7 @@ public class TestUtil {
   @SuppressWarnings({"unchecked", "deprecation"})
   private static void generateParquetData(Path filePath, boolean 
isParquetSchemaSimple)
       throws IOException, URISyntaxException {
-    Schema schema = (isParquetSchemaSimple ? SchemaTestUtil.getSimpleSchema() 
: SchemaTestUtil.getEvolvedSchema());
+    Schema schema = getTestDataSchema(isParquetSchemaSimple);
     org.apache.parquet.schema.MessageType parquetSchema = new 
AvroSchemaConverter().convert(schema);
     BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1,
         BloomFilterTypeCode.SIMPLE.name());
@@ -294,7 +306,7 @@ public class TestUtil {
 
   private static HoodieLogFile generateLogData(Path parquetFilePath, boolean 
isLogSchemaSimple)
       throws IOException, InterruptedException, URISyntaxException {
-    Schema schema = (isLogSchemaSimple ? SchemaTestUtil.getSimpleSchema() : 
SchemaTestUtil.getEvolvedSchema());
+    Schema schema = getTestDataSchema(isLogSchemaSimple);
     HoodieBaseFile dataFile = new 
HoodieBaseFile(fileSystem.getFileStatus(parquetFilePath));
     // Write a log file for this parquet file
     Writer logWriter = 
HoodieLogFormat.newWriterBuilder().onParentPath(parquetFilePath.getParent())
@@ -311,6 +323,25 @@ public class TestUtil {
     return logWriter.getLogFile();
   }
 
+  private static Schema getTestDataSchema(boolean isSimpleSchema) throws 
IOException {
+    return isSimpleSchema ? SchemaTestUtil.getSimpleSchema() : 
SchemaTestUtil.getEvolvedSchema();
+  }
+
+  private static void addSchemaToCommitMetadata(HoodieCommitMetadata 
commitMetadata, boolean isSimpleSchema,
+      boolean useSchemaFromCommitMetadata) throws IOException {
+    if (useSchemaFromCommitMetadata) {
+      Schema dataSchema = getTestDataSchema(isSimpleSchema);
+      commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
dataSchema.toString());
+    }
+  }
+
+  private static void addSchemaToCommitMetadata(HoodieCommitMetadata 
commitMetadata, String schema,
+      boolean useSchemaFromCommitMetadata) {
+    if (useSchemaFromCommitMetadata) {
+      commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema);
+    }
+  }
+
   private static void checkResult(boolean result) {
     if (!result) {
       throw new JUnitException("Could not initialize");

Reply via email to