This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 19d29ac  [HUDI-741] Added checks to validate Hoodie's schema evolution.
19d29ac is described below

commit 19d29ac7d0ac451324d25bfe85b51b1eca10bc67
Author: Prashant Wason <[email protected]>
AuthorDate: Fri Mar 27 00:53:49 2020 -0700

    [HUDI-741] Added checks to validate Hoodie's schema evolution.
    
    HUDI specific validation of schema evolution should ensure that a newer 
schema can be used for the dataset by checking that the data written using the 
old schema can be read using the new schema.
    
    Code changes:
    
    1. Added a new config in HoodieWriteConfig to enable schema validation 
check (disabled by default)
    2. Moved code that reads schema from base/log files into hudi-common from 
hudi-hive-sync
    3. Added writerSchema to the extraMetadata of compaction commits in MOR 
table. This is same as that for commits on COW table.
    
    Testing changes:
    
    4. Extended TestHoodieClientBase to add insertBatch API which allows 
inserting a new batch of unique records into a HUDI table
    5. Added a unit test to verify schema evolution for both COW and MOR tables.
    6. Added unit tests for schema compatiblity checks.
---
 .../hudi/cli/commands/HoodieLogFileCommand.java    |   8 +-
 .../org/apache/hudi/client/HoodieWriteClient.java  |  64 ++-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  12 +
 .../org/apache/hudi/io/HoodieAppendHandle.java     |   2 -
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |   6 +-
 .../apache/hudi/client/TestHoodieClientBase.java   |  28 ++
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   6 +-
 .../hudi/client/TestTableSchemaEvolution.java      | 487 +++++++++++++++++++++
 .../hudi/common/HoodieTestDataGenerator.java       |  19 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |   8 +
 .../hudi/common/table/TableSchemaResolver.java     | 360 +++++++++++++++
 .../java/org/apache/hudi/hive/HiveSyncTool.java    |   4 +-
 .../org/apache/hudi/hive/HoodieHiveClient.java     | 129 +-----
 .../util/{SchemaUtil.java => HiveSchemaUtil.java}  |  33 +-
 .../org/apache/hudi/hive/TestHiveSyncTool.java     |  18 +-
 15 files changed, 1004 insertions(+), 180 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 0d67da7..db1ab16 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
@@ -36,7 +37,6 @@ import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
-import org.apache.hudi.hive.util.SchemaUtil;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Schema;
@@ -90,7 +90,7 @@ public class HoodieLogFileCommand implements CommandMarker {
     for (String logFilePath : logFilePaths) {
       FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath));
       Schema writerSchema = new AvroSchemaConverter()
-          .convert(Objects.requireNonNull(SchemaUtil.readSchemaFromLogFile(fs, 
new Path(logFilePath))));
+          
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, 
new Path(logFilePath))));
       Reader reader = HoodieLogFormat.newReader(fs, new 
HoodieLogFile(fsStatus[0].getPath()), writerSchema);
 
       // read the avro blocks
@@ -179,7 +179,7 @@ public class HoodieLogFileCommand implements CommandMarker {
     AvroSchemaConverter converter = new AvroSchemaConverter();
     // get schema from last log file
     Schema readerSchema =
-        
converter.convert(Objects.requireNonNull(SchemaUtil.readSchemaFromLogFile(fs, 
new Path(logFilePaths.get(logFilePaths.size() - 1)))));
+        
converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs,
 new Path(logFilePaths.get(logFilePaths.size() - 1)))));
 
     List<IndexedRecord> allRecords = new ArrayList<>();
 
@@ -202,7 +202,7 @@ public class HoodieLogFileCommand implements CommandMarker {
     } else {
       for (String logFile : logFilePaths) {
         Schema writerSchema = new AvroSchemaConverter()
-            
.convert(Objects.requireNonNull(SchemaUtil.readSchemaFromLogFile(client.getFs(),
 new Path(logFile))));
+            
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(),
 new Path(logFile))));
         HoodieLogFormat.Reader reader =
             HoodieLogFormat.newReader(fs, new HoodieLogFile(new 
Path(logFile)), writerSchema);
         // read the avro blocks
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java 
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 1fd34a3..c7de8df 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -19,6 +19,8 @@
 package org.apache.hudi.client;
 
 import com.codahale.metrics.Timer;
+import org.apache.avro.Schema;
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -36,6 +38,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
@@ -50,9 +53,11 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieCompactionException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.exception.HoodieRestoreException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieSavepointException;
+import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.table.HoodieCommitArchiveLog;
@@ -166,6 +171,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final 
String instantTime) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
+    validateSchema(table, true);
     setOperationType(WriteOperationType.UPSERT);
     HoodieWriteMetadata result = table.upsert(jsc,instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
@@ -185,6 +191,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> 
preppedRecords, final String instantTime) {
     HoodieTable<T> table = 
getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED);
+    validateSchema(table, true);
     setOperationType(WriteOperationType.UPSERT_PREPPED);
     HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, 
preppedRecords);
     return postWrite(result, instantTime, table);
@@ -202,6 +209,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final 
String instantTime) {
     HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT);
+    validateSchema(table, false);
     setOperationType(WriteOperationType.INSERT);
     HoodieWriteMetadata result = table.insert(jsc,instantTime, records);
     return postWrite(result, instantTime, table);
@@ -220,6 +228,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> 
preppedRecords, final String instantTime) {
     HoodieTable<T> table = 
getTableAndInitCtx(WriteOperationType.INSERT_PREPPED);
+    validateSchema(table, false);
     setOperationType(WriteOperationType.INSERT_PREPPED);
     HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, 
preppedRecords);
     return postWrite(result, instantTime, table);
@@ -882,6 +891,8 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
       metadata.addWriteStat(stat.getPartitionPath(), stat);
     }
 
+    metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
+
     // Finalize write
     finalizeWrite(table, compactionCommitTime, updateStatusMap);
 
@@ -919,4 +930,55 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     });
     return compactionInstantTimeOpt;
   }
-}
+
+  /**
+   * Ensure that the current writerSchema is compatible with the latest schema 
of this dataset.
+   *
+   * When inserting/updating data, we read records using the last used schema 
and convert them to the
+   * GenericRecords with writerSchema. Hence, we need to ensure that this 
conversion can take place without errors.
+   *
+   * @param hoodieTable The Hoodie Table
+   * @param isUpsert If this is a check during upserts
+   * @throws HoodieUpsertException If schema check fails during upserts
+   * @throws HoodieInsertException If schema check fails during inserts
+   */
+  private void validateSchema(HoodieTable<T> hoodieTable, final boolean 
isUpsert)
+      throws HoodieUpsertException, HoodieInsertException {
+
+    if (!getConfig().getAvroSchemaValidate()) {
+      // Check not required
+      return;
+    }
+
+    boolean isValid = false;
+    String errorMsg = "WriterSchema is not compatible with the schema present 
in the Table";
+    Throwable internalError = null;
+    Schema tableSchema = null;
+    Schema writerSchema = null;
+    try {
+      TableSchemaResolver schemaUtil = new 
TableSchemaResolver(hoodieTable.getMetaClient());
+      writerSchema = 
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
+      tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata());
+      isValid = schemaUtil.isSchemaCompatible(tableSchema, writerSchema);
+    } catch (Exception e) {
+      // Two error cases are possible:
+      // 1. There was no schema as no data has been inserted yet (first time 
only)
+      // 2. Failure in reading the schema
+      isValid = 
hoodieTable.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()
 == 0;
+      errorMsg = "Failed to read latest schema on path " + basePath;
+      internalError = e;
+    }
+
+    if (!isValid) {
+      LOG.error(errorMsg);
+      LOG.warn("WriterSchema: " + writerSchema);
+      LOG.warn("Table latest schema: " + tableSchema);
+      if (isUpsert) {
+        throw new HoodieUpsertException(errorMsg, internalError);
+      } else {
+        throw new HoodieInsertException(errorMsg, internalError);
+      }
+    }
+  }
+
+}
\ No newline at end of file
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 3db17c7..5ac87da 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -52,6 +52,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   private static final String TIMELINE_LAYOUT_VERSION = 
"hoodie.timeline.layout.version";
   private static final String BASE_PATH_PROP = "hoodie.base.path";
   private static final String AVRO_SCHEMA = "hoodie.avro.schema";
+  private static final String AVRO_SCHEMA_VALIDATE = 
"hoodie.avro.schema.validate";
+  private static final String DEFAULT_AVRO_SCHEMA_VALIDATE = "false";
   private static final String DEFAULT_PARALLELISM = "1500";
   private static final String INSERT_PARALLELISM = 
"hoodie.insert.shuffle.parallelism";
   private static final String BULKINSERT_PARALLELISM = 
"hoodie.bulkinsert.shuffle.parallelism";
@@ -131,6 +133,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     props.setProperty(AVRO_SCHEMA, schemaStr);
   }
 
+  public boolean getAvroSchemaValidate() {
+    return Boolean.parseBoolean(props.getProperty(AVRO_SCHEMA_VALIDATE));
+  }
+
   public String getTableName() {
     return props.getProperty(TABLE_NAME);
   }
@@ -577,6 +583,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
       return this;
     }
 
+    public Builder withAvroSchemaValidate(boolean enable) {
+      props.setProperty(AVRO_SCHEMA_VALIDATE, String.valueOf(enable));
+      return this;
+    }
+
     public Builder forTable(String tableName) {
       props.setProperty(TABLE_NAME, tableName);
       return this;
@@ -721,6 +732,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
           String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECKS));
       setDefaultOnCondition(props, 
!props.containsKey(FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP),
           FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP, 
DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED);
+      setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), 
AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE);
 
       // Make sure the props is propagated
       setDefaultOnCondition(props, !isIndexConfigSet, 
HoodieIndexConfig.newBuilder().fromProperties(props).build());
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java 
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 980d731..cc91e89 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -191,8 +191,6 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload> extends HoodieWri
     return Option.empty();
   }
 
-  // TODO (NA) - Perform a writerSchema check of current input record with the 
last writerSchema on log file
-  // to make sure we don't append records with older (shorter) writerSchema 
than already appended
   public void doAppend() {
     while (recordItr.hasNext()) {
       HoodieRecord record = recordItr.next();
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java 
b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 0b1506d..08bb06e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -63,7 +63,7 @@ public abstract class HoodieWriteHandle<T extends 
HoodieRecordPayload> extends H
     this.partitionPath = partitionPath;
     this.fileId = fileId;
     this.originalSchema = new Schema.Parser().parse(config.getSchema());
-    this.writerSchema = createHoodieWriteSchema(originalSchema);
+    this.writerSchema = 
HoodieAvroUtils.createHoodieWriteSchema(originalSchema);
     this.timer = new HoodieTimer().startTimer();
     this.writeStatus = (WriteStatus) 
ReflectionUtils.loadClass(config.getWriteStatusClassName(),
         !hoodieTable.getIndex().isImplicitWithStorage(), 
config.getWriteStatusFailureFraction());
@@ -78,10 +78,6 @@ public abstract class HoodieWriteHandle<T extends 
HoodieRecordPayload> extends H
     return FSUtils.makeWriteToken(getPartitionId(), getStageId(), 
getAttemptId());
   }
 
-  public static Schema createHoodieWriteSchema(Schema originalSchema) {
-    return HoodieAvroUtils.addMetadataFields(originalSchema);
-  }
-
   public Path makeNewPath(String partitionPath) {
     Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath);
     try {
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java 
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
index ee76ed3..4015735 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
@@ -342,6 +342,34 @@ public class TestHoodieClientBase extends 
HoodieClientTestHarness {
   }
 
   /**
+   * Helper to insert another batch of records and do regular assertions on 
the state after successful completion.
+   *
+   * @param writeConfig            Hoodie Write Config
+   * @param client                 Hoodie Write Client
+   * @param newCommitTime          New Commit Timestamp to be used
+   * @param initCommitTime         Begin Timestamp (usually "000")
+   * @param numRecordsInThisCommit Number of records to be added in the new 
commit
+   * @param writeFn                Write Function to be used for insertion
+   * @param isPreppedAPI           Boolean flag to indicate writeFn expects 
prepped records
+   * @param assertForCommit        Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords        Expected number of records when scanned
+   * @param expTotalCommits        Expected number of commits (including this 
commit)
+   * @return RDD of write-status
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> insertBatch(HoodieWriteConfig writeConfig, 
HoodieWriteClient client, String newCommitTime,
+                                   String initCommitTime, int 
numRecordsInThisCommit,
+                                   Function3<JavaRDD<WriteStatus>, 
HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+                                   boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws 
Exception {
+    final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+        generateWrapRecordsFn(isPreppedAPI, writeConfig, 
dataGen::generateInserts);
+
+    return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), 
initCommitTime, numRecordsInThisCommit,
+        recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, 
expTotalRecords, expTotalCommits);
+  }
+
+  /**
    * Helper to upsert batch of records and do regular assertions on the state 
after successful completion.
    *
    * @param writeConfig                  Hoodie Write Config
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 7382b7e..0899474 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -199,15 +199,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
TestHoodieClientBase {
     String recordKey = UUID.randomUUID().toString();
     HoodieKey keyOne = new HoodieKey(recordKey, "2018-01-01");
     HoodieRecord<TestRawTripPayload> recordOne =
-        new HoodieRecord(keyOne, 
HoodieTestDataGenerator.generateRandomValue(keyOne, newCommitTime));
+        new HoodieRecord(keyOne, dataGen.generateRandomValue(keyOne, 
newCommitTime));
 
     HoodieKey keyTwo = new HoodieKey(recordKey, "2018-02-01");
     HoodieRecord recordTwo =
-        new HoodieRecord(keyTwo, 
HoodieTestDataGenerator.generateRandomValue(keyTwo, newCommitTime));
+        new HoodieRecord(keyTwo, dataGen.generateRandomValue(keyTwo, 
newCommitTime));
 
     // Same key and partition as keyTwo
     HoodieRecord recordThree =
-        new HoodieRecord(keyTwo, 
HoodieTestDataGenerator.generateRandomValue(keyTwo, newCommitTime));
+        new HoodieRecord(keyTwo, dataGen.generateRandomValue(keyTwo, 
newCommitTime));
 
     JavaRDD<HoodieRecord<TestRawTripPayload>> records =
         jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
 
b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
new file mode 100644
index 0000000..7b75d5a
--- /dev/null
+++ 
b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.TestRawTripPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.HoodieTestDataGenerator.FARE_NESTED_SCHEMA;
+import static org.apache.hudi.common.HoodieTestDataGenerator.MAP_TYPE_SCHEMA;
+import static org.apache.hudi.common.HoodieTestDataGenerator.TIP_NESTED_SCHEMA;
+import static 
org.apache.hudi.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static 
org.apache.hudi.common.HoodieTestDataGenerator.TRIP_SCHEMA_PREFIX;
+import static 
org.apache.hudi.common.HoodieTestDataGenerator.TRIP_SCHEMA_SUFFIX;
+import static 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_1;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestTableSchemaEvolution extends TestHoodieClientBase {
+  private final String initCommitTime = "000";
+  private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
+  private HoodieTestDataGenerator dataGenEvolved = new 
HoodieTestDataGenerator();
+  private HoodieTestDataGenerator dataGenDevolved = new 
HoodieTestDataGenerator();
+
+  public static final String EXTRA_FIELD_SCHEMA =
+      "{\"name\": \"new_field\", \"type\": \"boolean\", \"default\": false},";
+
+  // TRIP_EXAMPLE_SCHEMA with a new_field added
+  public static final String TRIP_EXAMPLE_SCHEMA_EVOLVED = TRIP_SCHEMA_PREFIX 
+ MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
+      + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX;
+
+  // TRIP_EXAMPLE_SCHEMA with tip field removed
+  public static final String TRIP_EXAMPLE_SCHEMA_DEVOLVED = TRIP_SCHEMA_PREFIX 
+ MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
+      + TRIP_SCHEMA_SUFFIX;
+
+  @Before
+  public void setUp() throws Exception {
+    initResources();
+  }
+
+  @After
+  public void tearDown() {
+    cleanupSparkContexts();
+  }
+
+  @Test
+  public void testSchemaCompatibilityBasic() throws Exception {
+    assertTrue("Same schema is compatible",
+               TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA));
+
+    String reorderedSchema = TRIP_SCHEMA_PREFIX  + TIP_NESTED_SCHEMA + 
FARE_NESTED_SCHEMA
+        + MAP_TYPE_SCHEMA + TRIP_SCHEMA_SUFFIX;
+    assertTrue("Reordered fields are compatible",
+        TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
reorderedSchema));
+    assertTrue("Reordered fields are compatible",
+        TableSchemaResolver.isSchemaCompatible(reorderedSchema, 
TRIP_EXAMPLE_SCHEMA));
+
+    String renamedSchema = TRIP_EXAMPLE_SCHEMA.replace("tip_history", 
"tip_future");
+    assertFalse("Renamed fields are not compatible",
+        TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
renamedSchema));
+
+    assertFalse("Deleted single field is not compatible",
+        TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_DEVOLVED));
+    String deletedMultipleFieldSchema = TRIP_SCHEMA_PREFIX  + 
TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
+    assertFalse("Deleted multiple fields are not compatible",
+        TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
deletedMultipleFieldSchema));
+
+    String renamedRecordSchema = TRIP_EXAMPLE_SCHEMA.replace("triprec", 
"triprec_renamed");
+    assertFalse("Renamed record name is not compatible",
+        TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
renamedRecordSchema));
+
+    String swappedFieldSchema = TRIP_SCHEMA_PREFIX + 
MAP_TYPE_SCHEMA.replace("city_to_state", "fare")
+        + FARE_NESTED_SCHEMA.replace("fare", "city_to_state") + 
TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
+    assertFalse("Swapped fields are not compatible",
+        TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
swappedFieldSchema));
+
+    String typeChangeSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + 
FARE_NESTED_SCHEMA
+        + TIP_NESTED_SCHEMA.replace("string", "boolean") + TRIP_SCHEMA_SUFFIX;
+    assertFalse("Field type change is not compatible",
+        TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
typeChangeSchema));
+
+    assertTrue("Added field with default is compatible (Evolved Schema)",
+        TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
TRIP_EXAMPLE_SCHEMA_EVOLVED));
+
+    String multipleAddedFieldSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + 
FARE_NESTED_SCHEMA
+        + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + 
EXTRA_FIELD_SCHEMA.replace("new_field", "new_new_field")
+        + TRIP_SCHEMA_SUFFIX;
+    assertTrue("Multiple added fields with defauls are compatible",
+        TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, 
multipleAddedFieldSchema));
+  }
+
+  @Test
+  public void testMORTable() throws Exception {
+    tableType = HoodieTableType.MERGE_ON_READ;
+    initMetaClient();
+
+    // Create the table
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), 
metaClient.getBasePath(),
+        HoodieTableType.MERGE_ON_READ, 
metaClient.getTableConfig().getTableName(),
+        metaClient.getArchivePath(), 
metaClient.getTableConfig().getPayloadClass(), VERSION_1);
+
+    HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Initial inserts with TRIP_EXAMPLE_SCHEMA
+    int numRecords = 10;
+    insertFirstBatch(hoodieWriteConfig, client, "001", initCommitTime,
+                     numRecords, HoodieWriteClient::insert, false, false, 
numRecords);
+    checkLatestDeltaCommit("001");
+
+    // Compact once so we can incrementally read later
+    assertTrue(client.scheduleCompactionAtInstant("002", Option.empty()));
+    client.compact("002");
+
+    // Updates with same schema is allowed
+    final int numUpdateRecords = 5;
+    updateBatch(hoodieWriteConfig, client, "003", "002", Option.empty(),
+                initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, 
false, false, 0, 0, 0);
+    checkLatestDeltaCommit("003");
+    checkReadRecords("000", numRecords);
+
+    // Delete with same schema is allowed
+    final int numDeleteRecords = 2;
+    numRecords -= numDeleteRecords;
+    deleteBatch(hoodieWriteConfig, client, "004", "003", initCommitTime, 
numDeleteRecords,
+                HoodieWriteClient::delete, false, false, 0, 0);
+    checkLatestDeltaCommit("004");
+    checkReadRecords("000", numRecords);
+
+    // Insert with evolved schema is not allowed
+    HoodieWriteConfig hoodieDevolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_DEVOLVED);
+    client = getHoodieWriteClient(hoodieDevolvedWriteConfig, false);
+    final List<HoodieRecord> failedRecords = generateInsertsWithSchema("004", 
numRecords, TRIP_EXAMPLE_SCHEMA_DEVOLVED);
+    try {
+      // We cannot use insertBatch directly here because we want to insert 
records
+      // with a devolved schema and insertBatch inserts records using the 
TRIP_EXMPLE_SCHEMA.
+      writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
+          (String s, Integer a) -> failedRecords, HoodieWriteClient::insert, 
false, 0, 0, 0);
+      fail("Insert with devolved scheme should fail");
+    } catch (HoodieInsertException ex) {
+      // no new commit
+      checkLatestDeltaCommit("004");
+      checkReadRecords("000", numRecords);
+      client.rollback("005");
+    }
+
+    // Update with devolved schema is also not allowed
+    try {
+      updateBatch(hoodieDevolvedWriteConfig, client, "005", "004", 
Option.empty(),
+                  initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, 
false, false, 0, 0, 0);
+      fail("Update with devolved scheme should fail");
+    } catch (HoodieUpsertException ex) {
+      // no new commit
+      checkLatestDeltaCommit("004");
+      checkReadRecords("000", numRecords);
+      client.rollback("005");
+    }
+
+    // Insert with an evolved scheme is allowed
+    HoodieWriteConfig hoodieEvolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED);
+    client = getHoodieWriteClient(hoodieEvolvedWriteConfig, false);
+
+    // We cannot use insertBatch directly here because we want to insert 
records
+    // with a evolved schemaand insertBatch inserts records using the 
TRIP_EXMPLE_SCHEMA.
+    final List<HoodieRecord> evolvedRecords = generateInsertsWithSchema("005", 
numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED);
+    writeBatch(client, "005", "004", Option.empty(), initCommitTime, 
numRecords,
+        (String s, Integer a) -> evolvedRecords, HoodieWriteClient::insert, 
false, 0, 0, 0);
+
+    // new commit
+    checkLatestDeltaCommit("005");
+    checkReadRecords("000", 2 * numRecords);
+
+    // Updates with evolved schema is allowed
+    final List<HoodieRecord> updateRecords = generateUpdatesWithSchema("006", 
numUpdateRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED);
+    writeBatch(client, "006", "005", Option.empty(), initCommitTime,
+        numUpdateRecords, (String s, Integer a) -> updateRecords, 
HoodieWriteClient::upsert, false, 0, 0, 0);
+    // new commit
+    checkLatestDeltaCommit("006");
+    checkReadRecords("000", 2 * numRecords);
+
+    // Now even the original schema cannot be used for updates as it is 
devolved in relation to the
+    // current schema of the dataset.
+    client = getHoodieWriteClient(hoodieWriteConfig, false);
+    try {
+      updateBatch(hoodieWriteConfig, client, "007", "006", Option.empty(),
+                  initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, 
false, false, 0, 0, 0);
+      fail("Update with original scheme should fail");
+    } catch (HoodieUpsertException ex) {
+      // no new commit
+      checkLatestDeltaCommit("006");
+      checkReadRecords("000", 2 * numRecords);
+      client.rollback("007");
+    }
+
+    // Now even the original schema cannot be used for inserts as it is 
devolved in relation to the
+    // current schema of the dataset.
+    try {
+      // We are not using insertBatch directly here because insertion of these
+      // records will fail and we dont want to keep these records within 
HoodieTestDataGenerator as we
+      // will be testing updates later.
+      failedRecords.clear();
+      failedRecords.addAll(dataGen.generateInserts("007", numRecords));
+      writeBatch(client, "007", "006", Option.empty(), initCommitTime, 
numRecords,
+          (String s, Integer a) -> failedRecords, HoodieWriteClient::insert, 
true, numRecords, numRecords, 1);
+      fail("Insert with original scheme should fail");
+    } catch (HoodieInsertException ex) {
+      // no new commit
+      checkLatestDeltaCommit("006");
+      checkReadRecords("000", 2 * numRecords);
+      client.rollback("007");
+
+      // Remove the inserts from the in-memory state of HoodieTestDataGenerator
+      // as these records were never inserted in the dataset. This is required 
so
+      // that future calls to updateBatch or deleteBatch do not generate 
updates
+      // or deletes for records which do not even exist.
+      for (HoodieRecord record : failedRecords) {
+        assertTrue(dataGen.deleteExistingKeyIfPresent(record.getKey()));
+      }
+    }
+
+    // Rollback to the original schema
+    client.restoreToInstant("004");
+    checkLatestDeltaCommit("004");
+
+    // Updates with original schema are now allowed
+    client = getHoodieWriteClient(hoodieWriteConfig, false);
+    updateBatch(hoodieWriteConfig, client, "008", "004", Option.empty(),
+                initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, 
false, false, 0, 0, 0);
+    // new commit
+    checkLatestDeltaCommit("008");
+    checkReadRecords("000", 2 * numRecords);
+
+    // Insert with original schema is allowed now
+    insertBatch(hoodieWriteConfig, client, "009", "008", numRecords, 
HoodieWriteClient::insert,
+        false, false, 0, 0, 0);
+    checkLatestDeltaCommit("009");
+    checkReadRecords("000", 3 * numRecords);
+  }
+
+  @Test
+  public void testCopyOnWriteTable() throws Exception {
+    // Create the table
+    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), 
metaClient.getBasePath(),
+        HoodieTableType.COPY_ON_WRITE, 
metaClient.getTableConfig().getTableName(),
+        metaClient.getArchivePath(), 
metaClient.getTableConfig().getPayloadClass(), VERSION_1);
+
+    HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
+
+    // Initial inserts with TRIP_EXAMPLE_SCHEMA
+    int numRecords = 10;
+    insertFirstBatch(hoodieWriteConfig, client, "001", initCommitTime,
+                     numRecords, HoodieWriteClient::insert, false, true, 
numRecords);
+    checkReadRecords("000", numRecords);
+
+    // Updates with same schema is allowed
+    final int numUpdateRecords = 5;
+    updateBatch(hoodieWriteConfig, client, "002", "001", Option.empty(),
+                initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, 
false, true,
+                numUpdateRecords, numRecords, 2);
+    checkReadRecords("000", numRecords);
+
+    // Delete with same schema is allowed
+    final int numDeleteRecords = 2;
+    numRecords -= numDeleteRecords;
+    deleteBatch(hoodieWriteConfig, client, "003", "002", initCommitTime, 
numDeleteRecords,
+                HoodieWriteClient::delete, false, true, 0, numRecords);
+    checkReadRecords("000", numRecords);
+
+    // Insert with devolved schema is not allowed
+    HoodieWriteConfig hoodieDevolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_DEVOLVED);
+    client = getHoodieWriteClient(hoodieDevolvedWriteConfig, false);
+    final List<HoodieRecord> failedRecords = generateInsertsWithSchema("004", 
numRecords, TRIP_EXAMPLE_SCHEMA_DEVOLVED);
+    try {
+      // We cannot use insertBatch directly here because we want to insert 
records
+      // with a devolved schema.
+      writeBatch(client, "004", "003", Option.empty(), "003", numRecords,
+          (String s, Integer a) -> failedRecords, HoodieWriteClient::insert, 
true, numRecords, numRecords, 1);
+      fail("Insert with devolved scheme should fail");
+    } catch (HoodieInsertException ex) {
+      // no new commit
+      HoodieTimeline curTimeline = 
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
+      assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003"));
+      client.rollback("004");
+    }
+
+    // Update with devolved schema is not allowed
+    try {
+      updateBatch(hoodieDevolvedWriteConfig, client, "004", "003", 
Option.empty(),
+                  initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, 
false, true,
+                  numUpdateRecords, 2 * numRecords, 5);
+      fail("Update with devolved scheme should fail");
+    } catch (HoodieUpsertException ex) {
+      // no new commit
+      HoodieTimeline curTimeline = 
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
+      assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003"));
+      client.rollback("004");
+    }
+
+    // Insert with evolved scheme is allowed
+    HoodieWriteConfig hoodieEvolvedWriteConfig = 
getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED);
+    client = getHoodieWriteClient(hoodieEvolvedWriteConfig, false);
+    final List<HoodieRecord> evolvedRecords = generateInsertsWithSchema("004", 
numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED);
+    // We cannot use insertBatch directly here because we want to insert 
records
+    // with a evolved schema.
+    writeBatch(client, "004", "003", Option.empty(), initCommitTime, 
numRecords,
+        (String s, Integer a) -> evolvedRecords, HoodieWriteClient::insert, 
true, numRecords, 2 * numRecords, 4);
+    // new commit
+    HoodieTimeline curTimeline = 
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
+    assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("004"));
+    checkReadRecords("000", 2 * numRecords);
+
+    // Updates with evolved schema is allowed
+    final List<HoodieRecord> updateRecords = generateUpdatesWithSchema("005", 
numUpdateRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED);
+    writeBatch(client, "005", "004", Option.empty(), initCommitTime,
+        numUpdateRecords, (String s, Integer a) -> updateRecords, 
HoodieWriteClient::upsert, true, numUpdateRecords, 2 * numRecords, 5);
+    checkReadRecords("000", 2 * numRecords);
+
+    // Now even the original schema cannot be used for updates as it is 
devolved
+    // in relation to the current schema of the dataset.
+    client = getHoodieWriteClient(hoodieWriteConfig, false);
+    try {
+      updateBatch(hoodieWriteConfig, client, "006", "005", Option.empty(),
+                  initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, 
false, true,
+                  numUpdateRecords, numRecords, 2);
+      fail("Update with original scheme should fail");
+    } catch (HoodieUpsertException ex) {
+      // no new commit
+      curTimeline = 
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
+      assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("005"));
+      client.rollback("006");
+    }
+
+    // Now even the original schema cannot be used for inserts as it is 
devolved
+    // in relation to the current schema of the dataset.
+    try {
+      // We are not using insertBatch directly here because insertion of these
+      // records will fail and we dont want to keep these records within
+      // HoodieTestDataGenerator.
+      failedRecords.clear();
+      failedRecords.addAll(dataGen.generateInserts("006", numRecords));
+      writeBatch(client, "006", "005", Option.empty(), initCommitTime, 
numRecords,
+          (String s, Integer a) -> failedRecords, HoodieWriteClient::insert, 
true, numRecords, numRecords, 1);
+      fail("Insert with original scheme should fail");
+    } catch (HoodieInsertException ex) {
+      // no new commit
+      curTimeline = 
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
+      assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("005"));
+      client.rollback("006");
+
+      // Remove the inserts from the in-memory state of HoodieTestDataGenerator
+      // as these records were never inserted in the dataset. This is required 
so
+      // that future calls to updateBatch or deleteBatch do not generate 
updates
+      // or deletes for records which do not even exist.
+      for (HoodieRecord record : failedRecords) {
+        assertTrue(dataGen.deleteExistingKeyIfPresent(record.getKey()));
+      }
+    }
+
+    // Revert to the older commit and ensure that the original schema can now
+    // be used for inserts and inserts.
+    client.restoreToInstant("003");
+    curTimeline = 
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
+    assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003"));
+    checkReadRecords("000", numRecords);
+
+    // Insert with original schema is allowed now
+    insertBatch(hoodieWriteConfig, client, "007", "003", numRecords, 
HoodieWriteClient::insert,
+        false, true, numRecords, 2 * numRecords, 1);
+    checkReadRecords("000", 2 * numRecords);
+
+    // Update with original schema is allowed now
+    updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(),
+        initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, false, 
true,
+        numUpdateRecords, 2 * numRecords, 5);
+    checkReadRecords("000", 2 * numRecords);
+  }
+
+  private void checkReadRecords(String instantTime, int numExpectedRecords) 
throws IOException {
+    if (tableType == HoodieTableType.COPY_ON_WRITE) {
+      HoodieTimeline timeline = 
metaClient.reloadActiveTimeline().getCommitTimeline();
+      assertEquals(numExpectedRecords, 
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, 
instantTime).count());
+    } else {
+      // TODO: This code fails to read records under the following conditions:
+      // 1. No parquet files yet (i.e. no compaction done yet)
+      // 2. Log file but no base file with the same FileID
+      /*
+      FileStatus[] allFiles = 
HoodieTestUtils.listAllDataAndLogFilesInPath(metaClient.getFs(), basePath);
+      HoodieTimeline timeline = 
metaClient.reloadActiveTimeline().getCommitsTimeline();
+      HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, timeline, allFiles);
+      List<String> dataFiles = fsView.getLatestBaseFiles().map(hf -> 
hf.getPath()).collect(Collectors.toList());
+
+      Configuration conf = new Configuration();
+      String absTableName = "hoodie." + 
metaClient.getTableConfig().getTableName();
+      conf.set(absTableName + ".consume.mode", "INCREMENTAL");
+      conf.set(absTableName + ".consume.start.timestamp", instantTime);
+      conf.set(absTableName + ".consume.max.commits", "-1");
+      List<GenericRecord> recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath, 
conf);
+      assertEquals(recordsRead.size(), numExpectedRecords);
+      */
+    }
+  }
+
+  private void checkLatestDeltaCommit(String instantTime) {
+    HoodieTimeline timeline = 
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+    
assertTrue(timeline.lastInstant().get().getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
+    
assertTrue(timeline.lastInstant().get().getTimestamp().equals(instantTime));
+  }
+
+  private List<HoodieRecord> generateInsertsWithSchema(String commitTime, int 
numRecords, String schemaStr) {
+    HoodieTestDataGenerator gen = 
schemaStr.equals(TRIP_EXAMPLE_SCHEMA_EVOLVED) ? dataGenEvolved : 
dataGenDevolved;
+    List<HoodieRecord> records = gen.generateInserts(commitTime, numRecords);
+    return convertToSchema(records, schemaStr);
+  }
+
+  private List<HoodieRecord> generateUpdatesWithSchema(String commitTime, int 
numRecords, String schemaStr) {
+    HoodieTestDataGenerator gen = 
schemaStr.equals(TRIP_EXAMPLE_SCHEMA_EVOLVED) ? dataGenEvolved : 
dataGenDevolved;
+    List<HoodieRecord> records = gen.generateUniqueUpdates(commitTime, 
numRecords);
+    return convertToSchema(records, schemaStr);
+  }
+
+  private List<HoodieRecord> convertToSchema(List<HoodieRecord> records, 
String schemaStr) {
+    Schema newSchema = new Schema.Parser().parse(schemaStr);
+    return records.stream().map(r -> {
+      HoodieKey key = r.getKey();
+      GenericRecord payload;
+      try {
+        payload = 
(GenericRecord)r.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
+        GenericRecord newPayload = 
HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(payload, newSchema);
+        return new HoodieRecord(key, new 
TestRawTripPayload(newPayload.toString(), key.getRecordKey(), 
key.getPartitionPath(), schemaStr));
+      } catch (IOException e) {
+        throw new RuntimeException("Conversion to new schema failed");
+      }
+    }).collect(Collectors.toList());
+  }
+
+  private HoodieWriteConfig getWriteConfig(String schema) {
+    return getConfigBuilder(schema)
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build())
+        .withAvroSchemaValidate(true)
+        .build();
+  }
+
+  protected HoodieTableType getTableType() {
+    return tableType;
+  }
+}
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java 
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index 8e9036a..65b567c 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -662,6 +662,23 @@ public class HoodieTestDataGenerator {
     return result.stream();
   }
 
+  public boolean deleteExistingKeyIfPresent(HoodieKey key) {
+    Map<Integer, KeyPartition> existingKeys = 
existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
+    Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
+    for (Map.Entry<Integer, KeyPartition> entry: existingKeys.entrySet()) {
+      if (entry.getValue().key.equals(key)) {
+        int index = entry.getKey();
+        existingKeys.put(index, existingKeys.get(numExistingKeys - 1));
+        existingKeys.remove(numExistingKeys - 1);
+        numExistingKeys--;
+        numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, numExistingKeys);
+        return true;
+      }
+    }
+
+    return false;
+  }
+
   public String[] getPartitionPaths() {
     return partitionPaths;
   }
@@ -679,4 +696,4 @@ public class HoodieTestDataGenerator {
   public void close() {
     existingKeysBySchema.clear();
   }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index af44a31..20b49e2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -97,6 +97,14 @@ public class HoodieAvroUtils {
         || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName);
   }
 
+  public static Schema createHoodieWriteSchema(Schema originalSchema) {
+    return HoodieAvroUtils.addMetadataFields(originalSchema);
+  }
+
+  public static Schema createHoodieWriteSchema(String originalSchema) {
+    return createHoodieWriteSchema(new Schema.Parser().parse(originalSchema));
+  }
+
   /**
    * Adds the Hoodie metadata fields to the given schema.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
new file mode 100644
index 0000000..0bab862
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Helper class to read schema from data files and log files and to convert it 
between different formats.
+ */
+public class TableSchemaResolver {
+
+  private static final Logger LOG = 
LogManager.getLogger(TableSchemaResolver.class);
+  private HoodieTableMetaClient metaClient;
+
+  public TableSchemaResolver(HoodieTableMetaClient metaClient) {
+    this.metaClient = metaClient;
+  }
+
+  /**
+   * Gets the schema for a hoodie table. Depending on the type of table, read 
from any file written in the latest
+   * commit. We will assume that the schema has not changed within a single 
atomic write.
+   *
+   * @return Parquet schema for this table
+   * @throws Exception
+   */
+  public MessageType getDataSchema() throws Exception {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+
+    try {
+      switch (metaClient.getTableType()) {
+        case COPY_ON_WRITE:
+          // If this is COW, get the last commit and read the schema from a 
file written in the
+          // last commit
+          HoodieInstant lastCommit =
+              
activeTimeline.getCommitsTimeline().filterCompletedInstants().lastInstant().orElseThrow(()
 -> new InvalidTableException(metaClient.getBasePath()));
+          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+              .fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), 
HoodieCommitMetadata.class);
+          String filePath = 
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
+              .orElseThrow(() -> new IllegalArgumentException("Could not find 
any data file written for commit "
+                  + lastCommit + ", could not get schema for table " + 
metaClient.getBasePath() + ", Metadata :"
+                  + commitMetadata));
+          return readSchemaFromBaseFile(new Path(filePath));
+        case MERGE_ON_READ:
+          // If this is MOR, depending on whether the latest commit is a delta 
commit or
+          // compaction commit
+          // Get a datafile written and get the schema from that file
+          Option<HoodieInstant> lastCompactionCommit =
+              
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
+          LOG.info("Found the last compaction commit as " + 
lastCompactionCommit);
+
+          Option<HoodieInstant> lastDeltaCommit;
+          if (lastCompactionCommit.isPresent()) {
+            lastDeltaCommit = 
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
+                .findInstantsAfter(lastCompactionCommit.get().getTimestamp(), 
Integer.MAX_VALUE).lastInstant();
+          } else {
+            lastDeltaCommit =
+                
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
+          }
+          LOG.info("Found the last delta commit " + lastDeltaCommit);
+
+          if (lastDeltaCommit.isPresent()) {
+            HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
+            // read from the log file wrote
+            commitMetadata = 
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(),
+                HoodieCommitMetadata.class);
+            Pair<String, HoodieFileFormat> filePathWithFormat =
+                
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
+                    .filter(s -> 
s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny()
+                    .map(f -> Pair.of(f, 
HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> {
+                      // No Log files in Delta-Commit. Check if there are any 
parquet files
+                      return 
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
+                          .filter(s -> 
s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension())))
+                          .findAny().map(f -> Pair.of(f, 
HoodieFileFormat.PARQUET)).orElseThrow(() ->
+                              new IllegalArgumentException("Could not find any 
data file written for commit "
+                              + lastDeltaInstant + ", could not get schema for 
table " + metaClient.getBasePath()
+                              + ", CommitMetadata :" + commitMetadata));
+                    });
+            switch (filePathWithFormat.getRight()) {
+              case HOODIE_LOG:
+                return readSchemaFromLogFile(lastCompactionCommit, new 
Path(filePathWithFormat.getLeft()));
+              case PARQUET:
+                return readSchemaFromBaseFile(new 
Path(filePathWithFormat.getLeft()));
+              default:
+                throw new IllegalArgumentException("Unknown file format :" + 
filePathWithFormat.getRight()
+                    + " for file " + filePathWithFormat.getLeft());
+            }
+          } else {
+            return readSchemaFromLastCompaction(lastCompactionCommit);
+          }
+        default:
+          LOG.error("Unknown table type " + metaClient.getTableType());
+          throw new InvalidTableException(metaClient.getBasePath());
+      }
+    } catch (IOException e) {
+      throw new HoodieException("Failed to read data schema", e);
+    }
+  }
+
+  /**
+   * Gets the schema for a hoodie table in Avro format.
+   *
+   * @return Avro schema for this table
+   * @throws Exception
+   */
+  public Schema getTableSchema() throws Exception {
+    return convertParquetSchemaToAvro(getDataSchema());
+  }
+
+  /**
+   * Gets the schema for a hoodie table in Avro format from the 
HoodieCommitMetadata of the last commit.
+   *
+   * @return Avro schema for this table
+   * @throws Exception
+   */
+  public Schema getTableSchemaFromCommitMetadata() throws Exception {
+    try {
+      HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      byte[] data = 
timeline.getInstantDetails(timeline.lastInstant().get()).get();
+      HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, 
HoodieCommitMetadata.class);
+      String existingSchemaStr = 
metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
+      return new Schema.Parser().parse(existingSchemaStr);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to read schema from commit metadata", 
e);
+    }
+  }
+
+  /**
+   * Convert a parquet scheme to the avro format.
+   *
+   * @param parquetSchema The parquet schema to convert
+   * @return The converted avro schema
+   */
+  public Schema convertParquetSchemaToAvro(MessageType parquetSchema) {
+    AvroSchemaConverter avroSchemaConverter = new 
AvroSchemaConverter(metaClient.getHadoopConf());
+    return avroSchemaConverter.convert(parquetSchema);
+  }
+
+  /**
+   * HUDI specific validation of schema evolution. Ensures that a newer schema 
can be used for the dataset by
+   * checking if the data written using the old schema can be read using the 
new schema.
+   *
+   * HUDI requires a Schema to be specified in HoodieWriteConfig and is used 
by the HoodieWriteClient to
+   * create the records. The schema is also saved in the data files (parquet 
format) and log files (avro format).
+   * Since a schema is required each time new data is ingested into a HUDI 
dataset, schema can be evolved over time.
+   *
+   * New Schema is compatible only if:
+   * A1. There is no change in schema
+   * A2. A field has been added and it has a default value specified
+   *
+   * New Schema is incompatible if:
+   * B1. A field has been deleted
+   * B2. A field has been renamed (treated as delete + add)
+   * B3. A field's type has changed to be incompatible with the older type
+   *
+   * Issue with org.apache.avro.SchemaCompatibility:
+   *  org.apache.avro.SchemaCompatibility checks schema compatibility between 
a writer schema (which originally wrote
+   *  the AVRO record) and a readerSchema (with which we are reading the 
record). It ONLY guarantees that that each
+   *  field in the reader record can be populated from the writer record. 
Hence, if the reader schema is missing a
+   *  field, it is still compatible with the writer schema.
+   *
+   *  In other words, org.apache.avro.SchemaCompatibility was written to 
guarantee that we can read the data written
+   *  earlier. It does not guarantee schema evolution for HUDI (B1 above).
+   *
+   * Implementation: This function implements specific HUDI specific checks 
(listed below) and defers the remaining
+   * checks to the org.apache.avro.SchemaCompatibility code.
+   *
+   * Checks:
+   * C1. If there is no change in schema: success
+   * C2. If a field has been deleted in new schema: failure
+   * C3. If a field has been added in new schema: it should have default value 
specified
+   * C4. If a field has been renamed(treated as delete + add): failure
+   * C5. If a field type has changed: failure
+   *
+   * @param oldSchema Older schema to check.
+   * @param newSchema Newer schema to check.
+   * @return True if the schema validation is successful
+   */
+  public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) 
{
+    if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == 
Schema.Type.RECORD) {
+      // record names must match:
+      if (!SchemaCompatibility.schemaNameEquals(oldSchema, newSchema)) {
+        return false;
+      }
+
+      // Check that each field in the oldSchema can populated the newSchema
+      for (final Field oldSchemaField : oldSchema.getFields()) {
+        final Field newSchemaField = 
SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField);
+        if (newSchemaField == null) {
+          // C4 or C2: newSchema does not correspond to any field in the 
oldSchema
+          return false;
+        } else {
+          if (!isSchemaCompatible(oldSchemaField.schema(), 
newSchemaField.schema())) {
+            // C5: The fields do not have a compatible type
+            return false;
+          }
+        }
+      }
+
+      // Check that new fields added in newSchema have default values as they 
will not be
+      // present in oldSchema and hence cannot be populated on reading records 
from existing data.
+      for (final Field newSchemaField : newSchema.getFields()) {
+        final Field oldSchemaField = 
SchemaCompatibility.lookupWriterField(oldSchema, newSchemaField);
+        if (oldSchemaField == null) {
+          if (newSchemaField.defaultValue() == null) {
+            // C3: newly added field in newSchema does not have a default value
+            return false;
+          }
+        }
+      }
+
+      // All fields in the newSchema record can be populated from the 
oldSchema record
+      return true;
+    } else {
+      // Use the checks implemented by
+      org.apache.avro.SchemaCompatibility.SchemaPairCompatibility compatResult 
=
+          
org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(oldSchema, 
newSchema);
+      return compatResult.getType() == 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
+    }
+  }
+
+  public static boolean isSchemaCompatible(String oldSchema, String newSchema) 
{
+    return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new 
Schema.Parser().parse(newSchema));
+  }
+
+  /**
+   * Read the parquet schema from a parquet File.
+   */
+  public MessageType readSchemaFromBaseFile(Path parquetFilePath) throws 
IOException {
+    LOG.info("Reading schema from " + parquetFilePath);
+
+    FileSystem fs = metaClient.getRawFs();
+    if (!fs.exists(parquetFilePath)) {
+      throw new IllegalArgumentException(
+          "Failed to read schema from data file " + parquetFilePath + ". File 
does not exist.");
+    }
+    ParquetMetadata fileFooter =
+        ParquetFileReader.readFooter(fs.getConf(), parquetFilePath, 
ParquetMetadataConverter.NO_FILTER);
+    return fileFooter.getFileMetaData().getSchema();
+  }
+
+  /**
+   * Read schema from a data file from the last compaction commit done.
+   * @throws Exception
+   */
+  public MessageType readSchemaFromLastCompaction(Option<HoodieInstant> 
lastCompactionCommitOpt) throws Exception {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+
+    HoodieInstant lastCompactionCommit = 
lastCompactionCommitOpt.orElseThrow(() -> new Exception(
+        "Could not read schema from last compaction, no compaction commits 
found on path " + metaClient));
+
+    // Read from the compacted file wrote
+    HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata
+        
.fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(), 
HoodieCommitMetadata.class);
+    String filePath = 
compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
+        .orElseThrow(() -> new IllegalArgumentException("Could not find any 
data file written for compaction "
+            + lastCompactionCommit + ", could not get schema for table " + 
metaClient.getBasePath()));
+    return readSchemaFromBaseFile(new Path(filePath));
+  }
+
+  /**
+   * Read the schema from the log file on path.
+   *
+   * @return
+   */
+  public MessageType readSchemaFromLogFile(Path path) throws IOException {
+    FileSystem fs = metaClient.getRawFs();
+    Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), 
null);
+    HoodieAvroDataBlock lastBlock = null;
+    while (reader.hasNext()) {
+      HoodieLogBlock block = reader.next();
+      if (block instanceof HoodieAvroDataBlock) {
+        lastBlock = (HoodieAvroDataBlock) block;
+      }
+    }
+    reader.close();
+    if (lastBlock != null) {
+      return new AvroSchemaConverter().convert(lastBlock.getSchema());
+    }
+    return null;
+  }
+
+  /**
+   * Read the schema from the log file on path.
+   * @throws Exception
+   */
+  public MessageType readSchemaFromLogFile(Option<HoodieInstant> 
lastCompactionCommitOpt, Path path)
+      throws Exception {
+    MessageType messageType = readSchemaFromLogFile(path);
+    // Fall back to read the schema from last compaction
+    if (messageType == null) {
+      LOG.info("Falling back to read the schema from last compaction " + 
lastCompactionCommitOpt);
+      return readSchemaFromLastCompaction(lastCompactionCommitOpt);
+    }
+    return messageType;
+  }
+
+  /**
+   * Read the schema from the log file on path.
+   *
+   * @return
+   */
+  public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) 
throws IOException {
+    Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), 
null);
+    HoodieAvroDataBlock lastBlock = null;
+    while (reader.hasNext()) {
+      HoodieLogBlock block = reader.next();
+      if (block instanceof HoodieAvroDataBlock) {
+        lastBlock = (HoodieAvroDataBlock) block;
+      }
+    }
+    reader.close();
+    if (lastBlock != null) {
+      return new AvroSchemaConverter().convert(lastBlock.getSchema());
+    }
+    return null;
+  }
+}
diff --git 
a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java 
b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index e6572c9..273635c 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -25,7 +25,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
 import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
 import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
-import org.apache.hudi.hive.util.SchemaUtil;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
 
 import com.beust.jcommander.JCommander;
 import org.apache.hadoop.conf.Configuration;
@@ -158,7 +158,7 @@ public class HiveSyncTool {
     } else {
       // Check if the table schema has evolved
       Map<String, String> tableSchema = 
hoodieHiveClient.getTableSchema(tableName);
-      SchemaDifference schemaDiff = SchemaUtil.getSchemaDifference(schema, 
tableSchema, cfg.partitionFields);
+      SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, 
tableSchema, cfg.partitionFields);
       if (!schemaDiff.isEmpty()) {
         LOG.info("Schema difference found for " + tableName);
         hoodieHiveClient.updateTableDefinition(tableName, schema);
diff --git 
a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java 
b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 55a4968..9f1a040 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -21,18 +21,14 @@ package org.apache.hudi.hive;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.StorageSchemes;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.InvalidTableException;
-import org.apache.hudi.hive.util.SchemaUtil;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -49,9 +45,6 @@ import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.jdbc.HiveDriver;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
 import org.apache.thrift.TException;
 
@@ -74,7 +67,7 @@ public class HoodieHiveClient {
   private static final String HOODIE_LAST_COMMIT_TIME_SYNC = 
"last_commit_time_sync";
   // Make sure we have the hive JDBC driver in classpath
   private static String driverName = HiveDriver.class.getName();
-  private static final String HIVE_ESCAPE_CHARACTER = 
SchemaUtil.HIVE_ESCAPE_CHARACTER;
+  private static final String HIVE_ESCAPE_CHARACTER = 
HiveSchemaUtil.HIVE_ESCAPE_CHARACTER;
 
   static {
     try {
@@ -250,7 +243,7 @@ public class HoodieHiveClient {
 
   void updateTableDefinition(String tableName, MessageType newSchema) {
     try {
-      String newSchemaStr = SchemaUtil.generateSchemaString(newSchema, 
syncConfig.partitionFields);
+      String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, 
syncConfig.partitionFields);
       // Cascade clause should not be present for non-partitioned tables
       String cascadeClause = syncConfig.partitionFields.size() > 0 ? " 
cascade" : "";
       StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE 
").append(HIVE_ESCAPE_CHARACTER)
@@ -268,7 +261,7 @@ public class HoodieHiveClient {
   void createTable(String tableName, MessageType storageSchema, String 
inputFormatClass, String outputFormatClass, String serdeClass) {
     try {
       String createSQLQuery =
-          SchemaUtil.generateCreateDDL(tableName, storageSchema, syncConfig, 
inputFormatClass, outputFormatClass, serdeClass);
+          HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, 
syncConfig, inputFormatClass, outputFormatClass, serdeClass);
       LOG.info("Creating table with " + createSQLQuery);
       updateHiveSQL(createSQLQuery);
     } catch (IOException e) {
@@ -340,123 +333,15 @@ public class HoodieHiveClient {
    *
    * @return Parquet schema for this table
    */
-  @SuppressWarnings("WeakerAccess")
   public MessageType getDataSchema() {
     try {
-      switch (tableType) {
-        case COPY_ON_WRITE:
-          // If this is COW, get the last commit and read the schema from a 
file written in the
-          // last commit
-          HoodieInstant lastCommit =
-              activeTimeline.lastInstant().orElseThrow(() -> new 
InvalidTableException(syncConfig.basePath));
-          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-              .fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), 
HoodieCommitMetadata.class);
-          String filePath = 
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
-              .orElseThrow(() -> new IllegalArgumentException("Could not find 
any data file written for commit "
-                  + lastCommit + ", could not get schema for table " + 
metaClient.getBasePath() + ", Metadata :"
-                  + commitMetadata));
-          return readSchemaFromBaseFile(new Path(filePath));
-        case MERGE_ON_READ:
-          // If this is MOR, depending on whether the latest commit is a delta 
commit or
-          // compaction commit
-          // Get a datafile written and get the schema from that file
-          Option<HoodieInstant> lastCompactionCommit =
-              
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
-          LOG.info("Found the last compaction commit as " + 
lastCompactionCommit);
-
-          Option<HoodieInstant> lastDeltaCommit;
-          if (lastCompactionCommit.isPresent()) {
-            lastDeltaCommit = 
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
-                .findInstantsAfter(lastCompactionCommit.get().getTimestamp(), 
Integer.MAX_VALUE).lastInstant();
-          } else {
-            lastDeltaCommit =
-                
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
-          }
-          LOG.info("Found the last delta commit " + lastDeltaCommit);
-
-          if (lastDeltaCommit.isPresent()) {
-            HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
-            // read from the log file wrote
-            commitMetadata = 
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(),
-                HoodieCommitMetadata.class);
-            Pair<String, HoodieFileFormat> filePathWithFormat =
-                
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
-                    .filter(s -> 
s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny()
-                    .map(f -> Pair.of(f, 
HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> {
-                      // No Log files in Delta-Commit. Check if there are any 
parquet files
-                      return 
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
-                          .filter(s -> 
s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension())))
-                          .findAny().map(f -> Pair.of(f, 
HoodieFileFormat.PARQUET)).orElseThrow(() ->
-                              new IllegalArgumentException("Could not find any 
data file written for commit "
-                              + lastDeltaInstant + ", could not get schema for 
table " + metaClient.getBasePath()
-                              + ", CommitMetadata :" + commitMetadata));
-                    });
-            switch (filePathWithFormat.getRight()) {
-              case HOODIE_LOG:
-                return readSchemaFromLogFile(lastCompactionCommit, new 
Path(filePathWithFormat.getLeft()));
-              case PARQUET:
-                return readSchemaFromBaseFile(new 
Path(filePathWithFormat.getLeft()));
-              default:
-                throw new IllegalArgumentException("Unknown file format :" + 
filePathWithFormat.getRight()
-                    + " for file " + filePathWithFormat.getLeft());
-            }
-          } else {
-            return readSchemaFromLastCompaction(lastCompactionCommit);
-          }
-        default:
-          LOG.error("Unknown table type " + tableType);
-          throw new InvalidTableException(syncConfig.basePath);
-      }
-    } catch (IOException e) {
+      return new TableSchemaResolver(metaClient).getDataSchema();
+    } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to read data schema", e);
     }
   }
 
   /**
-   * Read schema from a data file from the last compaction commit done.
-   */
-  private MessageType readSchemaFromLastCompaction(Option<HoodieInstant> 
lastCompactionCommitOpt) throws IOException {
-    HoodieInstant lastCompactionCommit = 
lastCompactionCommitOpt.orElseThrow(() -> new HoodieHiveSyncException(
-        "Could not read schema from last compaction, no compaction commits 
found on path " + syncConfig.basePath));
-
-    // Read from the compacted file wrote
-    HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata
-        
.fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(), 
HoodieCommitMetadata.class);
-    String filePath = 
compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
-        .orElseThrow(() -> new IllegalArgumentException("Could not find any 
data file written for compaction "
-            + lastCompactionCommit + ", could not get schema for table " + 
metaClient.getBasePath()));
-    return readSchemaFromBaseFile(new Path(filePath));
-  }
-
-  /**
-   * Read the schema from the log file on path.
-   */
-  private MessageType readSchemaFromLogFile(Option<HoodieInstant> 
lastCompactionCommitOpt, Path path)
-      throws IOException {
-    MessageType messageType = SchemaUtil.readSchemaFromLogFile(fs, path);
-    // Fall back to read the schema from last compaction
-    if (messageType == null) {
-      LOG.info("Falling back to read the schema from last compaction " + 
lastCompactionCommitOpt);
-      return readSchemaFromLastCompaction(lastCompactionCommitOpt);
-    }
-    return messageType;
-  }
-
-  /**
-   * Read the parquet schema from a parquet File.
-   */
-  private MessageType readSchemaFromBaseFile(Path parquetFilePath) throws 
IOException {
-    LOG.info("Reading schema from " + parquetFilePath);
-    if (!fs.exists(parquetFilePath)) {
-      throw new IllegalArgumentException(
-          "Failed to read schema from data file " + parquetFilePath + ". File 
does not exist.");
-    }
-    ParquetMetadata fileFooter =
-        ParquetFileReader.readFooter(fs.getConf(), parquetFilePath, 
ParquetMetadataConverter.NO_FILTER);
-    return fileFooter.getFileMetaData().getSchema();
-  }
-
-  /**
    * @return true if the configured table exists
    */
   public boolean doesTableExist(String tableName) {
diff --git 
a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java 
b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
similarity index 93%
rename from 
hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
rename to 
hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
index 499f2a7..7fd64bd 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
@@ -18,20 +18,12 @@
 
 package org.apache.hudi.hive.util;
 
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveSyncException;
 import org.apache.hudi.hive.SchemaDifference;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
@@ -50,9 +42,9 @@ import java.util.Set;
 /**
  * Schema Utilities.
  */
-public class SchemaUtil {
+public class HiveSchemaUtil {
 
-  private static final Logger LOG = LogManager.getLogger(SchemaUtil.class);
+  private static final Logger LOG = LogManager.getLogger(HiveSchemaUtil.class);
   public static final String HIVE_ESCAPE_CHARACTER = "`";
 
   /**
@@ -424,25 +416,4 @@ public class SchemaUtil {
     // Dont do that
     return "String";
   }
-
-  /**
-   * Read the schema from the log file on path.
-   * 
-   * @return
-   */
-  public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) 
throws IOException {
-    Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), 
null);
-    HoodieAvroDataBlock lastBlock = null;
-    while (reader.hasNext()) {
-      HoodieLogBlock block = reader.next();
-      if (block instanceof HoodieAvroDataBlock) {
-        lastBlock = (HoodieAvroDataBlock) block;
-      }
-    }
-    reader.close();
-    if (lastBlock != null) {
-      return new AvroSchemaConverter().convert(lastBlock.getSchema());
-    }
-    return null;
-  }
 }
diff --git 
a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java 
b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index d92c9d9..14dfada 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SchemaTestUtil;
 import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
 import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
-import org.apache.hudi.hive.util.SchemaUtil;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
 
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.parquet.schema.MessageType;
@@ -79,7 +79,7 @@ public class TestHiveSyncTool {
         
.optional(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("list").named("int_list")
         .named("ArrayOfInts");
 
-    String schemaString = SchemaUtil.generateSchemaString(schema);
+    String schemaString = HiveSchemaUtil.generateSchemaString(schema);
     assertEquals("`int_list` ARRAY< int>", schemaString);
 
     // A array of arrays
@@ -87,14 +87,14 @@ public class TestHiveSyncTool {
         
.as(OriginalType.LIST).repeatedGroup().required(PrimitiveType.PrimitiveTypeName.INT32).named("element")
         
.named("list").named("element").named("list").named("int_list_list").named("ArrayOfArrayOfInts");
 
-    schemaString = SchemaUtil.generateSchemaString(schema);
+    schemaString = HiveSchemaUtil.generateSchemaString(schema);
     assertEquals("`int_list_list` ARRAY< ARRAY< int>>", schemaString);
 
     // A list of integers
     schema = 
Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeated(PrimitiveType.PrimitiveTypeName.INT32)
         .named("element").named("int_list").named("ArrayOfInts");
 
-    schemaString = SchemaUtil.generateSchemaString(schema);
+    schemaString = HiveSchemaUtil.generateSchemaString(schema);
     assertEquals("`int_list` ARRAY< int>", schemaString);
 
     // A list of structs with two fields
@@ -102,7 +102,7 @@ public class TestHiveSyncTool {
         
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").required(PrimitiveType.PrimitiveTypeName.INT32)
         
.named("num").named("element").named("tuple_list").named("ArrayOfTuples");
 
-    schemaString = SchemaUtil.generateSchemaString(schema);
+    schemaString = HiveSchemaUtil.generateSchemaString(schema);
     assertEquals("`tuple_list` ARRAY< STRUCT< `str` : binary, `num` : int>>", 
schemaString);
 
     // A list of structs with a single field
@@ -112,7 +112,7 @@ public class TestHiveSyncTool {
         
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("array").named("one_tuple_list")
         .named("ArrayOfOneTuples");
 
-    schemaString = SchemaUtil.generateSchemaString(schema);
+    schemaString = HiveSchemaUtil.generateSchemaString(schema);
     assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", 
schemaString);
 
     // A list of structs with a single field
@@ -122,7 +122,7 @@ public class TestHiveSyncTool {
         
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list_tuple")
         .named("one_tuple_list").named("ArrayOfOneTuples2");
 
-    schemaString = SchemaUtil.generateSchemaString(schema);
+    schemaString = HiveSchemaUtil.generateSchemaString(schema);
     assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", 
schemaString);
 
     // A list of structs with a single field
@@ -132,7 +132,7 @@ public class TestHiveSyncTool {
         
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list").named("one_tuple_list")
         .named("ArrayOfOneTuples3");
 
-    schemaString = SchemaUtil.generateSchemaString(schema);
+    schemaString = HiveSchemaUtil.generateSchemaString(schema);
     assertEquals("`one_tuple_list` ARRAY< binary>", schemaString);
 
     // A list of maps
@@ -141,7 +141,7 @@ public class TestHiveSyncTool {
         
.as(OriginalType.UTF8).named("string_key").required(PrimitiveType.PrimitiveTypeName.INT32).named("int_value")
         
.named("key_value").named("array").named("map_list").named("ArrayOfMaps");
 
-    schemaString = SchemaUtil.generateSchemaString(schema);
+    schemaString = HiveSchemaUtil.generateSchemaString(schema);
     assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
   }
 

Reply via email to