This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 129e433  - Ugrading to Hive 2.x - Eliminating in-memory 
deltaRecordsMap - Use writerSchema to generate generic record needed by custom 
payloads - changes to make tests work with hive 2.x
129e433 is described below

commit 129e4336413fd2290e137804cf16c515c502c2f7
Author: Nishith Agarwal <[email protected]>
AuthorDate: Fri May 10 13:09:09 2019 -0700

    - Ugrading to Hive 2.x
    - Eliminating in-memory deltaRecordsMap
    - Use writerSchema to generate generic record needed by custom payloads
    - changes to make tests work with hive 2.x
---
 .../table/log/AbstractHoodieLogRecordScanner.java  |   2 -
 .../common/table/log/HoodieLogFileReader.java      |   2 +
 .../hoodie/common/table/log/HoodieLogFormat.java   |  20 +++
 .../common/table/log/HoodieLogFormatReader.java    |  10 ++
 .../uber/hoodie/common/util/HoodieAvroUtils.java   |  22 ++-
 .../uber/hoodie/common/util/LogReaderUtils.java    |  81 +++++++++++
 .../com/uber/hoodie/hadoop/HoodieInputFormat.java  |   3 +-
 .../hadoop/SafeParquetRecordReaderWrapper.java     |  11 +-
 .../realtime/AbstractRealtimeRecordReader.java     | 132 ++++++++---------
 .../hadoop/realtime/HoodieRealtimeInputFormat.java |  23 ++-
 .../realtime/HoodieRealtimeRecordReader.java       |  15 +-
 .../realtime/RealtimeCompactedRecordReader.java    |  83 ++++++-----
 .../realtime/RealtimeUnmergedRecordReader.java     |  13 +-
 .../uber/hoodie/hadoop/HoodieInputFormatTest.java  |   5 +-
 .../uber/hoodie/hadoop/InputFormatTestUtil.java    |  44 ++++++
 .../realtime/HoodieRealtimeRecordReaderTest.java   | 159 +++++++++++++++++----
 hoodie-hive/pom.xml                                |  15 ++
 .../com/uber/hoodie/hive/HoodieHiveClient.java     |   9 +-
 .../com/uber/hoodie/hive/util/HiveTestService.java |   3 +
 hoodie-utilities/pom.xml                           |  51 +++++--
 pom.xml                                            |   2 +-
 release/config/license-mappings.xml                |  40 ++++--
 22 files changed, 554 insertions(+), 191 deletions(-)

diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java
index b0010b4..c2fe730 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -310,8 +310,6 @@ public abstract class AbstractHoodieLogRecordScanner {
           processAvroDataBlock((HoodieAvroDataBlock) lastBlock);
           break;
         case DELETE_BLOCK:
-          // TODO : If delete is the only block written and/or records are 
present in parquet file
-          // TODO : Mark as tombstone (optional.empty()) for data instead of 
deleting the entry
           Arrays.stream(((HoodieDeleteBlock) 
lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey);
           break;
         case CORRUPT_BLOCK:
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
index 8c2dea4..d062cc1 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFileReader.java
@@ -331,6 +331,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader 
{
   /**
    * hasPrev is not idempotent
    */
+  @Override
   public boolean hasPrev() {
     try {
       if (!this.reverseReader) {
@@ -352,6 +353,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader 
{
    * iterate reverse (prev) or forward (next). Doing both in the same instance 
is not supported
    * WARNING : Every call to prev() should be preceded with hasPrev()
    */
+  @Override
   public HoodieLogBlock prev() throws IOException {
 
     if (!this.reverseReader) {
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java
index 3f01179..650700a 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java
@@ -81,6 +81,19 @@ public interface HoodieLogFormat {
      * @return the path to this {@link HoodieLogFormat}
      */
     HoodieLogFile getLogFile();
+
+    /**
+     * Read log file in reverse order and check if prev block is present
+     * @return
+     */
+    public boolean hasPrev();
+
+    /**
+     * Read log file in reverse order and return prev block if present
+     * @return
+     * @throws IOException
+     */
+    public HoodieLogBlock prev() throws IOException;
   }
 
 
@@ -246,6 +259,13 @@ public interface HoodieLogFormat {
     return new HoodieLogFileReader(fs, logFile, readerSchema, 
HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false, false);
   }
 
+  static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile 
logFile, Schema readerSchema, boolean
+      readBlockLazily, boolean reverseReader)
+      throws IOException {
+    return new HoodieLogFileReader(fs, logFile, readerSchema, 
HoodieLogFileReader.DEFAULT_BUFFER_SIZE,
+        readBlockLazily, reverseReader);
+  }
+
   /**
    * A set of feature flags associated with a log format. Versions are changed 
when the log format
    * changes. TODO(na) - Implement policies around major/minor versions
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java
index 9c5f82f..39ae5a9 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java
@@ -119,4 +119,14 @@ public class HoodieLogFormatReader implements 
HoodieLogFormat.Reader {
   public void remove() {
   }
 
+  @Override
+  public boolean hasPrev() {
+    return this.currentReader.hasPrev();
+  }
+
+  @Override
+  public HoodieLogBlock prev() throws IOException {
+    return this.currentReader.prev();
+  }
+
 }
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java
index 918c9a3..9b34fab 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieAvroUtils.java
@@ -44,6 +44,7 @@ import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
 import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.NullNode;
 
 /**
  * Helper class to do common stuff across Avro.
@@ -156,16 +157,16 @@ public class HoodieAvroUtils {
    * Add null fields to passed in schema. Caller is responsible for ensuring 
there is no duplicates.
    * As different query engines have varying constraints regarding treating 
the case-sensitivity of fields, its best
    * to let caller determine that.
+   *
    * @param schema Passed in schema
    * @param newFieldNames Null Field names to be added
-   * @return
    */
   public static Schema appendNullSchemaFields(Schema schema, List<String> 
newFieldNames) {
     List<Field> newFields = schema.getFields().stream().map(field -> {
       return new Schema.Field(field.name(), field.schema(), field.doc(), 
field.defaultValue());
     }).collect(Collectors.toList());
     for (String newField : newFieldNames) {
-      newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", 
null));
+      newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", 
NullNode.getInstance()));
     }
     Schema newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), 
schema.getNamespace(), schema.isError());
     newSchema.setFields(newFields);
@@ -184,11 +185,24 @@ public class HoodieAvroUtils {
 
 
   /**
-   * Given a avro record with a given schema, rewrites it into the new schema
+   * Given a avro record with a given schema, rewrites it into the new schema 
while setting fields only from the old
+   * schema
    */
   public static GenericRecord rewriteRecord(GenericRecord record, Schema 
newSchema) {
+    return rewrite(record, record.getSchema(), newSchema);
+  }
+
+  /**
+   * Given a avro record with a given schema, rewrites it into the new schema 
while setting fields only from the new
+   * schema
+   */
+  public static GenericRecord 
rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) {
+    return rewrite(record, newSchema, newSchema);
+  }
+
+  private static GenericRecord rewrite(GenericRecord record, Schema 
schemaWithFields, Schema newSchema) {
     GenericRecord newRecord = new GenericData.Record(newSchema);
-    for (Schema.Field f : record.getSchema().getFields()) {
+    for (Schema.Field f : schemaWithFields.getFields()) {
       newRecord.put(f.name(), record.get(f.name()));
     }
     if (!GenericData.get().validate(newSchema, newRecord)) {
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/LogReaderUtils.java 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/LogReaderUtils.java
new file mode 100644
index 0000000..ad5d500
--- /dev/null
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/LogReaderUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.util;
+
+import com.uber.hoodie.common.model.HoodieLogFile;
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.common.table.HoodieTimeline;
+import com.uber.hoodie.common.table.log.HoodieLogFormat;
+import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
+import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
+import 
com.uber.hoodie.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
+import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Utils class for performing various log file reading operations
+ */
+public class LogReaderUtils {
+
+  private static Schema readSchemaFromLogFileInReverse(FileSystem fs, 
HoodieActiveTimeline activeTimeline, Path path)
+      throws IOException {
+    HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs, new 
HoodieLogFile(path), null, true, true);
+    Schema writerSchema = null;
+    HoodieTimeline completedTimeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+    while (reader.hasPrev()) {
+      HoodieLogBlock block = reader.prev();
+      if (block instanceof HoodieAvroDataBlock && block != null) {
+        HoodieAvroDataBlock lastBlock = (HoodieAvroDataBlock) block;
+        if 
(completedTimeline.containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType
+            .INSTANT_TIME))) {
+          writerSchema = 
Schema.parse(lastBlock.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+          break;
+        }
+      }
+    }
+    reader.close();
+    return writerSchema;
+  }
+
+  public static Schema readLatestSchemaFromLogFiles(String basePath, 
List<String> deltaFilePaths, JobConf jobConf)
+      throws IOException {
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, 
basePath);
+    List<String> deltaPaths = deltaFilePaths.stream().map(s -> new 
HoodieLogFile(new Path(s)))
+        .sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> 
s.getPath().toString())
+        .collect(Collectors.toList());
+    if (deltaPaths.size() > 0) {
+      for (String logPath : deltaPaths) {
+        FileSystem fs = FSUtils.getFs(logPath, jobConf);
+        Schema schemaFromLogFile =
+            readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), 
new Path(logPath));
+        if (schemaFromLogFile != null) {
+          return schemaFromLogFile;
+        }
+      }
+    }
+    return null;
+  }
+
+}
diff --git 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
index 6df2375..c156630 100644
--- 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
+++ 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -187,7 +188,7 @@ public class HoodieInputFormat extends 
MapredParquetInputFormat implements Confi
   }
 
   @Override
-  public RecordReader<Void, ArrayWritable> getRecordReader(final InputSplit 
split,
+  public RecordReader<NullWritable, ArrayWritable> getRecordReader(final 
InputSplit split,
       final JobConf job, final Reporter reporter) throws IOException {
     // TODO enable automatic predicate pushdown after fixing issues
     //        FileSplit fileSplit = (FileSplit) split;
diff --git 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/SafeParquetRecordReaderWrapper.java
 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/SafeParquetRecordReaderWrapper.java
index 48c3f86..274e955 100644
--- 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/SafeParquetRecordReaderWrapper.java
+++ 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/SafeParquetRecordReaderWrapper.java
@@ -20,6 +20,7 @@ package com.uber.hoodie.hadoop;
 
 import java.io.IOException;
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.RecordReader;
 
@@ -31,10 +32,10 @@ import org.apache.hadoop.mapred.RecordReader;
  * another thread, we need to ensure new instance of ArrayWritable is 
buffered. ParquetReader createKey/Value is unsafe
  * as it gets reused for subsequent fetch. This wrapper makes ParquetReader 
safe for this use-case.
  */
-public class SafeParquetRecordReaderWrapper implements RecordReader<Void, 
ArrayWritable> {
+public class SafeParquetRecordReaderWrapper implements 
RecordReader<NullWritable, ArrayWritable> {
 
   // real Parquet reader to be wrapped
-  private final RecordReader<Void, ArrayWritable> parquetReader;
+  private final RecordReader<NullWritable, ArrayWritable> parquetReader;
 
   // Value Class
   private final Class valueClass;
@@ -43,7 +44,7 @@ public class SafeParquetRecordReaderWrapper implements 
RecordReader<Void, ArrayW
   private final int numValueFields;
 
 
-  public SafeParquetRecordReaderWrapper(RecordReader<Void, ArrayWritable> 
parquetReader) {
+  public SafeParquetRecordReaderWrapper(RecordReader<NullWritable, 
ArrayWritable> parquetReader) {
     this.parquetReader = parquetReader;
     ArrayWritable arrayWritable = parquetReader.createValue();
     this.valueClass = arrayWritable.getValueClass();
@@ -51,12 +52,12 @@ public class SafeParquetRecordReaderWrapper implements 
RecordReader<Void, ArrayW
   }
 
   @Override
-  public boolean next(Void key, ArrayWritable value) throws IOException {
+  public boolean next(NullWritable key, ArrayWritable value) throws 
IOException {
     return parquetReader.next(key, value);
   }
 
   @Override
-  public Void createKey() {
+  public NullWritable createKey() {
     return parquetReader.createKey();
   }
 
diff --git 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java
 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java
index 2946fed..490899e 100644
--- 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -18,13 +18,10 @@
 
 package com.uber.hoodie.hadoop.realtime;
 
-import com.uber.hoodie.common.model.HoodieLogFile;
-import com.uber.hoodie.common.table.log.HoodieLogFormat;
-import com.uber.hoodie.common.table.log.HoodieLogFormat.Reader;
-import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
-import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
-import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.model.HoodieAvroPayload;
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.util.HoodieAvroUtils;
+import com.uber.hoodie.common.util.LogReaderUtils;
 import com.uber.hoodie.common.util.collection.Pair;
 import com.uber.hoodie.exception.HoodieException;
 import com.uber.hoodie.exception.HoodieIOException;
@@ -44,7 +41,6 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -90,7 +86,7 @@ public abstract class AbstractRealtimeRecordReader {
   protected final HoodieRealtimeFileSplit split;
   protected final JobConf jobConf;
   private final MessageType baseFileSchema;
-
+  protected final boolean usesCustomPayload;
   // Schema handles
   private Schema readerSchema;
   private Schema writerSchema;
@@ -98,9 +94,12 @@ public abstract class AbstractRealtimeRecordReader {
   public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf 
job) {
     this.split = split;
     this.jobConf = job;
-
     LOG.info("cfg ==> " + 
job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
+    LOG.info("columnIds ==> " + 
job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+    LOG.info("partitioningColumns ==> " + job.get("partition_columns", ""));
     try {
+      this.usesCustomPayload = usesCustomPayload();
+      LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
       baseFileSchema = readSchema(jobConf, split.getPath());
       init();
     } catch (IOException e) {
@@ -109,6 +108,12 @@ public abstract class AbstractRealtimeRecordReader {
     }
   }
 
+  private boolean usesCustomPayload() {
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, 
split.getBasePath());
+    return 
!(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName())
+        || 
metaClient.getTableConfig().getPayloadClass().contains("com.uber.hoodie.OverwriteWithLatestAvroPayload"));
+  }
+
   /**
    * Reads the schema from the parquet file. This is different from 
ParquetUtils as it uses the
    * twitter parquet to support hive 1.1.0
@@ -121,22 +126,32 @@ public abstract class AbstractRealtimeRecordReader {
     }
   }
 
+  /**
+   * Prints a JSON representation of the ArrayWritable for easier debuggability
+   */
   protected static String arrayWritableToString(ArrayWritable writable) {
     if (writable == null) {
       return "null";
     }
-
     StringBuilder builder = new StringBuilder();
     Writable[] values = writable.get();
-    builder.append(String.format("(Size: %s)[", values.length));
+    builder.append("\"values_" + Math.random() + "_" + values.length + "\": 
{");
+    int i = 0;
     for (Writable w : values) {
       if (w instanceof ArrayWritable) {
-        builder.append(arrayWritableToString((ArrayWritable) w)).append(" ");
+        builder.append(arrayWritableToString((ArrayWritable) w)).append(",");
       } else {
-        builder.append(w).append(" ");
+        builder.append("\"value" + i + "\":" + "\"" + w + "\"").append(",");
+        if (w == null) {
+          builder.append("\"type" + i + "\":" + "\"unknown\"").append(",");
+        } else {
+          builder.append("\"type" + i + "\":" + "\"" + 
w.getClass().getSimpleName() + "\"").append(",");
+        }
       }
+      i++;
     }
-    builder.append("]");
+    builder.deleteCharAt(builder.length() - 1);
+    builder.append("}");
     return builder.toString();
   }
 
@@ -187,9 +202,10 @@ public abstract class AbstractRealtimeRecordReader {
         throw new HoodieException("Field " + fn + " not found in log schema. 
Query cannot proceed! "
             + "Derived Schema Fields: "
             + new ArrayList<>(schemaFieldsMap.keySet()));
+      } else {
+        projectedFields
+            .add(new Schema.Field(field.name(), field.schema(), field.doc(), 
field.defaultValue()));
       }
-      projectedFields
-          .add(new Schema.Field(field.name(), field.schema(), field.doc(), 
field.defaultValue()));
     }
 
     Schema projectedSchema = Schema
@@ -203,17 +219,10 @@ public abstract class AbstractRealtimeRecordReader {
    */
   public static Writable avroToArrayWritable(Object value, Schema schema) {
 
-    // if value is null, make a NullWritable
-    // Hive 2.x does not like NullWritable
     if (value == null) {
-
       return null;
-      //return NullWritable.get();
     }
 
-
-    Writable[] wrapperWritable;
-
     switch (schema.getType()) {
       case STRING:
         return new Text(value.toString());
@@ -231,39 +240,38 @@ public abstract class AbstractRealtimeRecordReader {
         return new BooleanWritable((Boolean) value);
       case NULL:
         return null;
-        // return NullWritable.get();
       case RECORD:
         GenericRecord record = (GenericRecord) value;
-        Writable[] values1 = new Writable[schema.getFields().size()];
-        int index1 = 0;
+        Writable[] recordValues = new Writable[schema.getFields().size()];
+        int recordValueIndex = 0;
         for (Schema.Field field : schema.getFields()) {
-          values1[index1++] = avroToArrayWritable(record.get(field.name()), 
field.schema());
+          recordValues[recordValueIndex++] = 
avroToArrayWritable(record.get(field.name()), field.schema());
         }
-        return new ArrayWritable(Writable.class, values1);
+        return new ArrayWritable(Writable.class, recordValues);
       case ENUM:
         return new Text(value.toString());
       case ARRAY:
         GenericArray arrayValue = (GenericArray) value;
-        Writable[] values2 = new Writable[arrayValue.size()];
-        int index2 = 0;
+        Writable[] arrayValues = new Writable[arrayValue.size()];
+        int arrayValueIndex = 0;
         for (Object obj : arrayValue) {
-          values2[index2++] = avroToArrayWritable(obj, 
schema.getElementType());
+          arrayValues[arrayValueIndex++] = avroToArrayWritable(obj, 
schema.getElementType());
         }
-        wrapperWritable = new Writable[]{new ArrayWritable(Writable.class, 
values2)};
-        return new ArrayWritable(Writable.class, wrapperWritable);
+        // Hive 1.x will fail here, it requires values2 to be wrapped into 
another ArrayWritable
+        return new ArrayWritable(Writable.class, arrayValues);
       case MAP:
         Map mapValue = (Map) value;
-        Writable[] values3 = new Writable[mapValue.size()];
-        int index3 = 0;
+        Writable[] mapValues = new Writable[mapValue.size()];
+        int mapValueIndex = 0;
         for (Object entry : mapValue.entrySet()) {
           Map.Entry mapEntry = (Map.Entry) entry;
-          Writable[] mapValues = new Writable[2];
-          mapValues[0] = new Text(mapEntry.getKey().toString());
-          mapValues[1] = avroToArrayWritable(mapEntry.getValue(), 
schema.getValueType());
-          values3[index3++] = new ArrayWritable(Writable.class, mapValues);
+          Writable[] nestedMapValues = new Writable[2];
+          nestedMapValues[0] = new Text(mapEntry.getKey().toString());
+          nestedMapValues[1] = avroToArrayWritable(mapEntry.getValue(), 
schema.getValueType());
+          mapValues[mapValueIndex++] = new ArrayWritable(Writable.class, 
nestedMapValues);
         }
-        wrapperWritable = new Writable[]{new ArrayWritable(Writable.class, 
values3)};
-        return new ArrayWritable(Writable.class, wrapperWritable);
+        // Hive 1.x will fail here, it requires values3 to be wrapped into 
another ArrayWritable
+        return new ArrayWritable(Writable.class, mapValues);
       case UNION:
         List<Schema> types = schema.getTypes();
         if (types.size() != 2) {
@@ -285,29 +293,13 @@ public abstract class AbstractRealtimeRecordReader {
     }
   }
 
-  public static Schema readSchemaFromLogFile(FileSystem fs, Path path) throws 
IOException {
-    Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), 
null);
-    HoodieAvroDataBlock lastBlock = null;
-    while (reader.hasNext()) {
-      HoodieLogBlock block = reader.next();
-      if (block instanceof HoodieAvroDataBlock) {
-        lastBlock = (HoodieAvroDataBlock) block;
-      }
-    }
-    reader.close();
-    if (lastBlock != null) {
-      return lastBlock.getSchema();
-    }
-    return null;
-  }
-
   /**
    * Hive implementation of ParquetRecordReader results in partition columns 
not present in the original parquet file
    * to also be part of the projected schema. Hive expects the record reader 
implementation to return the row in its
    * entirety (with un-projected column having null values). As we use 
writerSchema for this, make sure writer schema
    * also includes partition columns
+   *
    * @param schema Schema to be changed
-   * @return
    */
   private static Schema addPartitionFields(Schema schema, List<String> 
partitioningFields) {
     final Set<String> firstLevelFieldNames = 
schema.getFields().stream().map(Field::name)
@@ -319,27 +311,26 @@ public abstract class AbstractRealtimeRecordReader {
   }
 
   /**
-   * Goes through the log files and populates a map with latest version of 
each key logged, since
-   * the base split was written.
+   * Goes through the log files in reverse order and finds the schema from the 
last available data block. If not, falls
+   * back to the schema from the latest parquet file. Finally, sets the 
partition column and projection fields into
+   * the job conf.
    */
   private void init() throws IOException {
-    writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
-    List<String> fieldNames = 
writerSchema.getFields().stream().map(Field::name).collect(Collectors.toList());
-    if (split.getDeltaFilePaths().size() > 0) {
-      String logPath = 
split.getDeltaFilePaths().get(split.getDeltaFilePaths().size() - 1);
-      FileSystem fs = FSUtils.getFs(logPath, jobConf);
-      writerSchema = readSchemaFromLogFile(fs, new Path(logPath));
-      fieldNames = 
writerSchema.getFields().stream().map(Field::name).collect(Collectors.toList());
+    Schema schemaFromLogFile = LogReaderUtils
+        .readLatestSchemaFromLogFiles(split.getBasePath(), 
split.getDeltaFilePaths(), jobConf);
+    if (schemaFromLogFile == null) {
+      writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
+      LOG.debug("Writer Schema From Parquet => " + writerSchema.getFields());
+    } else {
+      writerSchema = schemaFromLogFile;
+      LOG.debug("Writer Schema From Log => " + writerSchema.getFields());
     }
-
     // Add partitioning fields to writer schema for resulting row to contain 
null values for these fields
-
     String partitionFields = jobConf.get("partition_columns", "");
     List<String> partitioningFields =
         partitionFields.length() > 0 ? 
Arrays.stream(partitionFields.split(",")).collect(Collectors.toList())
             : new ArrayList<>();
     writerSchema = addPartitionFields(writerSchema, partitioningFields);
-
     List<String> projectionFields = orderFields(
         jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
         jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR),
@@ -347,7 +338,6 @@ public abstract class AbstractRealtimeRecordReader {
     // TODO(vc): In the future, the reader schema should be updated based on 
log files & be able
     // to null out fields not present before
     readerSchema = generateProjectionSchema(writerSchema, projectionFields);
-
     LOG.info(String.format("About to read compacted logs %s for base split %s, 
projecting cols %s",
         split.getDeltaFilePaths(), split.getPath(), projectionFields));
   }
diff --git 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
index 6e323ec..1426373 100644
--- 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
+++ 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -68,6 +69,15 @@ public class HoodieRealtimeInputFormat extends 
HoodieInputFormat implements Conf
   public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
   public static final int HOODIE_RECORD_KEY_COL_POS = 2;
   public static final int HOODIE_PARTITION_PATH_COL_POS = 3;
+  // Track the read column ids and names to be used throughout the execution 
and lifetime of this task
+  // Needed for Hive on Spark. Our theory is that due to
+  // {@link org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher}
+  // not handling empty list correctly, the ParquetRecordReaderWrapper ends up 
adding the same column ids multiple
+  // times which ultimately breaks the query.
+  // TODO : Find why RO view works fine but RT doesn't, JIRA: 
https://issues.apache.org/jira/browse/HUDI-151
+  public static String READ_COLUMN_IDS;
+  public static String READ_COLUMN_NAMES;
+  public static boolean isReadColumnsSet = false;
 
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
@@ -190,7 +200,7 @@ public class HoodieRealtimeInputFormat extends 
HoodieInputFormat implements Conf
     return conf;
   }
 
-  private static Configuration addRequiredProjectionFields(Configuration 
configuration) {
+  private static synchronized Configuration 
addRequiredProjectionFields(Configuration configuration) {
     // Need this to do merge records in HoodieRealtimeRecordReader
     configuration = addProjectionField(configuration, 
HoodieRecord.RECORD_KEY_METADATA_FIELD,
         HOODIE_RECORD_KEY_COL_POS);
@@ -198,11 +208,16 @@ public class HoodieRealtimeInputFormat extends 
HoodieInputFormat implements Conf
         HOODIE_COMMIT_TIME_COL_POS);
     configuration = addProjectionField(configuration, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
         HOODIE_PARTITION_PATH_COL_POS);
+    if (!isReadColumnsSet) {
+      READ_COLUMN_IDS = 
configuration.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
+      READ_COLUMN_NAMES = 
configuration.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+      isReadColumnsSet = true;
+    }
     return configuration;
   }
 
   @Override
-  public RecordReader<Void, ArrayWritable> getRecordReader(final InputSplit 
split,
+  public RecordReader<NullWritable, ArrayWritable> getRecordReader(final 
InputSplit split,
       final JobConf job, final Reporter reporter) throws IOException {
 
     LOG.info("Before adding Hoodie columns, Projections :" + job
@@ -225,6 +240,10 @@ public class HoodieRealtimeInputFormat extends 
HoodieInputFormat implements Conf
         "HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit 
and not with "
             + split);
 
+    // Reset the original column ids and names
+    job.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, READ_COLUMN_IDS);
+    job.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, 
READ_COLUMN_NAMES);
+
     return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job,
         super.getRecordReader(split, job, reporter));
   }
diff --git 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java
 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java
index 6a515b8..61f267a 100644
--- 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java
+++ 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 
@@ -30,17 +31,17 @@ import org.apache.hadoop.mapred.RecordReader;
  * Realtime Record Reader which can do compacted (merge-on-read) record 
reading or
  * unmerged reading (parquet and log files read in parallel) based on job 
configuration.
  */
-public class HoodieRealtimeRecordReader implements RecordReader<Void, 
ArrayWritable> {
+public class HoodieRealtimeRecordReader implements RecordReader<NullWritable, 
ArrayWritable> {
 
   // Property to enable parallel reading of parquet and log files without 
merging.
   public static final String REALTIME_SKIP_MERGE_PROP = 
"hoodie.realtime.merge.skip";
   // By default, we do merged-reading
   public static final String DEFAULT_REALTIME_SKIP_MERGE = "false";
   public static final Log LOG = 
LogFactory.getLog(HoodieRealtimeRecordReader.class);
-  private final RecordReader<Void, ArrayWritable> reader;
+  private final RecordReader<NullWritable, ArrayWritable> reader;
 
   public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job,
-      RecordReader<Void, ArrayWritable> realReader) {
+      RecordReader<NullWritable, ArrayWritable> realReader) {
     this.reader = constructRecordReader(split, job, realReader);
   }
 
@@ -56,8 +57,8 @@ public class HoodieRealtimeRecordReader implements 
RecordReader<Void, ArrayWrita
    * @param realReader Parquet Record Reader
    * @return Realtime Reader
    */
-  private static RecordReader<Void, ArrayWritable> 
constructRecordReader(HoodieRealtimeFileSplit split,
-      JobConf jobConf, RecordReader<Void, ArrayWritable> realReader) {
+  private static RecordReader<NullWritable, ArrayWritable> 
constructRecordReader(HoodieRealtimeFileSplit split,
+      JobConf jobConf, RecordReader<NullWritable, ArrayWritable> realReader) {
     try {
       if (canSkipMerging(jobConf)) {
         LOG.info("Enabling un-merged reading of realtime records");
@@ -71,12 +72,12 @@ public class HoodieRealtimeRecordReader implements 
RecordReader<Void, ArrayWrita
   }
 
   @Override
-  public boolean next(Void key, ArrayWritable value) throws IOException {
+  public boolean next(NullWritable key, ArrayWritable value) throws 
IOException {
     return this.reader.next(key, value);
   }
 
   @Override
-  public Void createKey() {
+  public NullWritable createKey() {
     return this.reader.createKey();
   }
 
diff --git 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java
 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java
index 6181df2..85efe6f 100644
--- 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -22,68 +22,50 @@ import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieRecordPayload;
 import com.uber.hoodie.common.table.log.HoodieMergedLogRecordScanner;
 import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.HoodieAvroUtils;
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 
 class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader 
implements
-    RecordReader<Void, ArrayWritable> {
+    RecordReader<NullWritable, ArrayWritable> {
 
-  protected final RecordReader<Void, ArrayWritable> parquetReader;
-  private final HashMap<String, ArrayWritable> deltaRecordMap;
+  protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
+  private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> 
deltaRecordMap;
 
   public RealtimeCompactedRecordReader(HoodieRealtimeFileSplit split, JobConf 
job,
-      RecordReader<Void, ArrayWritable> realReader) throws IOException {
+      RecordReader<NullWritable, ArrayWritable> realReader) throws IOException 
{
     super(split, job);
     this.parquetReader = realReader;
-    this.deltaRecordMap = new HashMap<>();
-    readAndCompactLog();
+    this.deltaRecordMap = getMergedLogRecordScanner().getRecords();
   }
 
   /**
    * Goes through the log files and populates a map with latest version of 
each key logged, since
    * the base split was written.
    */
-  private void readAndCompactLog() throws IOException {
-    HoodieMergedLogRecordScanner compactedLogRecordScanner = new 
HoodieMergedLogRecordScanner(
+  private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws 
IOException {
+    // NOTE: HoodieCompactedLogRecordScanner will not return records for an 
in-flight commit
+    // but can return records for completed commits > the commit we are trying 
to read (if using
+    // readCommit() API)
+    return new HoodieMergedLogRecordScanner(
         FSUtils.getFs(split.getPath().toString(), jobConf), 
split.getBasePath(),
-        split.getDeltaFilePaths(), getReaderSchema(), 
split.getMaxCommitTime(), getMaxCompactionMemoryInBytes(),
+        split.getDeltaFilePaths(), usesCustomPayload ? getWriterSchema() : 
getReaderSchema(), split.getMaxCommitTime(),
+        getMaxCompactionMemoryInBytes(),
         Boolean.valueOf(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
             DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
         false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, 
DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
         jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, 
DEFAULT_SPILLABLE_MAP_BASE_PATH));
-    // NOTE: HoodieCompactedLogRecordScanner will not return records for an 
in-flight commit
-    // but can return records for completed commits > the commit we are trying 
to read (if using
-    // readCommit() API)
-    for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : 
compactedLogRecordScanner) {
-      Optional<IndexedRecord> recordOptional = 
hoodieRecord.getData().getInsertValue(getReaderSchema());
-      ArrayWritable aWritable;
-      String key = hoodieRecord.getRecordKey();
-      if (recordOptional.isPresent()) {
-        GenericRecord rec = (GenericRecord) recordOptional.get();
-        // we assume, a later safe record in the log, is newer than what we 
have in the map &
-        // replace it.
-        // TODO : handle deletes here
-        aWritable = (ArrayWritable) avroToArrayWritable(rec, 
getWriterSchema());
-        deltaRecordMap.put(key, aWritable);
-      } else {
-        aWritable = new ArrayWritable(Writable.class, new Writable[0]);
-        deltaRecordMap.put(key, aWritable);
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Log record : " + arrayWritableToString(aWritable));
-      }
-    }
   }
 
   @Override
-  public boolean next(Void aVoid, ArrayWritable arrayWritable) throws 
IOException {
+  public boolean next(NullWritable aVoid, ArrayWritable arrayWritable) throws 
IOException {
     // Call the underlying parquetReader.next - which may replace the passed 
in ArrayWritable
     // with a new block of values
     boolean result = this.parquetReader.next(aVoid, arrayWritable);
@@ -96,18 +78,33 @@ class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader impleme
       // return from delta records map if we have some match.
       String key = 
arrayWritable.get()[HoodieRealtimeInputFormat.HOODIE_RECORD_KEY_COL_POS]
           .toString();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("key %s, base values: %s, log values: %s", key,
-            arrayWritableToString(arrayWritable), 
arrayWritableToString(deltaRecordMap.get(key))));
-      }
       if (deltaRecordMap.containsKey(key)) {
         // TODO(NA): Invoke preCombine here by converting arrayWritable to 
Avro. This is required since the
         // deltaRecord may not be a full record and needs values of columns 
from the parquet
-        Writable[] replaceValue = deltaRecordMap.get(key).get();
-        if (replaceValue.length < 1) {
-          // This record has been deleted, move to the next record
+        Optional<GenericRecord> rec;
+        if (usesCustomPayload) {
+          rec = 
deltaRecordMap.get(key).getData().getInsertValue(getWriterSchema());
+        } else {
+          rec = 
deltaRecordMap.get(key).getData().getInsertValue(getReaderSchema());
+        }
+        if (!rec.isPresent()) {
+          // If the record is not present, this is a delete record using an 
empty payload so skip this base record
+          // and move to the next record
           return next(aVoid, arrayWritable);
         }
+        GenericRecord recordToReturn = rec.get();
+        if (usesCustomPayload) {
+          // If using a custom payload, return only the projection fields
+          recordToReturn = 
HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), 
getReaderSchema());
+        }
+        // we assume, a later safe record in the log, is newer than what we 
have in the map &
+        // replace it.
+        ArrayWritable aWritable = (ArrayWritable) 
avroToArrayWritable(recordToReturn, getWriterSchema());
+        Writable[] replaceValue = aWritable.get();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("key %s, base values: %s, log values: %s", 
key,
+              arrayWritableToString(arrayWritable), 
arrayWritableToString(aWritable)));
+        }
         Writable[] originalValue = arrayWritable.get();
         try {
           System.arraycopy(replaceValue, 0, originalValue, 0, 
originalValue.length);
@@ -115,7 +112,7 @@ class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader impleme
         } catch (RuntimeException re) {
           LOG.error("Got exception when doing array copy", re);
           LOG.error("Base record :" + arrayWritableToString(arrayWritable));
-          LOG.error("Log record :" + 
arrayWritableToString(deltaRecordMap.get(key)));
+          LOG.error("Log record :" + arrayWritableToString(aWritable));
           throw re;
         }
       }
@@ -124,7 +121,7 @@ class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader impleme
   }
 
   @Override
-  public Void createKey() {
+  public NullWritable createKey() {
     return parquetReader.createKey();
   }
 
diff --git 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeUnmergedRecordReader.java
 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeUnmergedRecordReader.java
index 861e01f..da09945 100644
--- 
a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++ 
b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -34,20 +34,21 @@ import java.util.List;
 import java.util.Optional;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 
 class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader 
implements
-    RecordReader<Void, ArrayWritable> {
+    RecordReader<NullWritable, ArrayWritable> {
 
   // Log Record unmerged scanner
   private final HoodieUnMergedLogRecordScanner logRecordScanner;
 
   // Parquet record reader
-  private final RecordReader<Void, ArrayWritable> parquetReader;
+  private final RecordReader<NullWritable, ArrayWritable> parquetReader;
 
   // Parquet record iterator wrapper for the above reader
-  private final RecordReaderValueIterator<Void, ArrayWritable> 
parquetRecordsIterator;
+  private final RecordReaderValueIterator<NullWritable, ArrayWritable> 
parquetRecordsIterator;
 
   // Executor that runs the above producers in parallel
   private final BoundedInMemoryExecutor<ArrayWritable, ArrayWritable, ?> 
executor;
@@ -64,7 +65,7 @@ class RealtimeUnmergedRecordReader extends 
AbstractRealtimeRecordReader implemen
    * @param realReader Parquet Reader
    */
   public RealtimeUnmergedRecordReader(HoodieRealtimeFileSplit split, JobConf 
job,
-      RecordReader<Void, ArrayWritable> realReader) {
+      RecordReader<NullWritable, ArrayWritable> realReader) {
     super(split, job);
     this.parquetReader = new SafeParquetRecordReaderWrapper(realReader);
     // Iterator for consuming records from parquet file
@@ -103,7 +104,7 @@ class RealtimeUnmergedRecordReader extends 
AbstractRealtimeRecordReader implemen
   }
 
   @Override
-  public boolean next(Void key, ArrayWritable value) throws IOException {
+  public boolean next(NullWritable key, ArrayWritable value) throws 
IOException {
     if (!iterator.hasNext()) {
       return false;
     }
@@ -113,7 +114,7 @@ class RealtimeUnmergedRecordReader extends 
AbstractRealtimeRecordReader implemen
   }
 
   @Override
-  public Void createKey() {
+  public NullWritable createKey() {
     return parquetReader.createKey();
   }
 
diff --git 
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java
 
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java
index 93c38b2..dbda1ed 100644
--- 
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java
+++ 
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/HoodieInputFormatTest.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -214,9 +215,9 @@ public class HoodieInputFormatTest {
     int totalCount = 0;
     InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
     for (InputSplit split : splits) {
-      RecordReader<Void, ArrayWritable> recordReader = inputFormat
+      RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat
           .getRecordReader(split, jobConf, null);
-      Void key = recordReader.createKey();
+      NullWritable key = recordReader.createKey();
       ArrayWritable writable = recordReader.createValue();
 
       while (recordReader.next(key, writable)) {
diff --git 
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
 
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
index 57272ff..2fa0705 100644
--- 
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
+++ 
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
@@ -20,7 +20,9 @@ package com.uber.hoodie.hadoop;
 
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieTestUtils;
+import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
 import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.HoodieAvroUtils;
 import com.uber.hoodie.common.util.SchemaTestUtil;
 import java.io.File;
 import java.io.FilenameFilter;
@@ -29,8 +31,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.parquet.avro.AvroParquetWriter;
@@ -79,6 +83,11 @@ public class InputFormatTestUtil {
     new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + 
".commit").createNewFile();
   }
 
+  public static void deltaCommit(TemporaryFolder basePath, String 
commitNumber) throws IOException {
+    // create the commit
+    new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + 
".deltacommit").createNewFile();
+  }
+
   public static void setupIncremental(JobConf jobConf, String startCommit,
       int numberOfCommitsToPull) {
     String modePropertyName = String
@@ -107,6 +116,16 @@ public class InputFormatTestUtil {
     return partitionPath;
   }
 
+
+  public static File prepareSimpleParquetDataset(TemporaryFolder basePath, 
Schema schema,
+      int numberOfFiles, int numberOfRecords, String commitNumber) throws 
Exception {
+    basePath.create();
+    HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), 
basePath.getRoot().toString());
+    File partitionPath = basePath.newFolder("2016", "05", "01");
+    createSimpleData(schema, partitionPath, numberOfFiles, numberOfRecords, 
commitNumber);
+    return partitionPath;
+  }
+
   public static File prepareNonPartitionedParquetDataset(TemporaryFolder 
baseDir, Schema schema,
       int numberOfFiles, int numberOfRecords, String commitNumber) throws 
IOException {
     baseDir.create();
@@ -135,6 +154,31 @@ public class InputFormatTestUtil {
     }
   }
 
+  private static void createSimpleData(Schema schema,
+      File partitionPath,  int numberOfFiles, int numberOfRecords, String 
commitNumber)
+      throws Exception {
+    AvroParquetWriter parquetWriter;
+    for (int i = 0; i < numberOfFiles; i++) {
+      String fileId = FSUtils.makeDataFileName(commitNumber, "1", "fileid" + 
i);
+      File dataFile = new File(partitionPath, fileId);
+      parquetWriter = new AvroParquetWriter(new 
Path(dataFile.getAbsolutePath()), schema);
+      try {
+        List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 
numberOfRecords);
+        String commitTime = HoodieActiveTimeline.createNewCommitTime();
+        Schema hoodieFieldsSchema = HoodieAvroUtils.addMetadataFields(schema);
+        for (IndexedRecord record : records) {
+          GenericRecord p = HoodieAvroUtils.rewriteRecord((GenericRecord) 
record, hoodieFieldsSchema);
+          p.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
UUID.randomUUID().toString());
+          p.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, "0000/00/00");
+          p.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitNumber);
+          parquetWriter.write(p);
+        }
+      } finally {
+        parquetWriter.close();
+      }
+    }
+  }
+
   private static Iterable<? extends GenericRecord> generateAvroRecords(Schema 
schema,
       int numberOfRecords, String commitTime, String fileId) throws 
IOException {
     List<GenericRecord> records = new ArrayList<>(numberOfRecords);
diff --git 
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
 
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
index c9ffa80..dc268d6 100644
--- 
a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
+++ 
b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
@@ -48,6 +48,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -60,6 +61,7 @@ import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
@@ -91,7 +93,7 @@ public class HoodieRealtimeRecordReaderTest {
   private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema 
schema, String fileId,
       String baseCommit, String newCommit, int numberOfRecords)
       throws InterruptedException, IOException {
-    return writeLogFile(partitionDir, schema, fileId, baseCommit, newCommit, 
numberOfRecords, 0, 0);
+    return writeDataBlockToLogFile(partitionDir, schema, fileId, baseCommit, 
newCommit, numberOfRecords, 0, 0);
   }
 
   private HoodieLogFormat.Writer writeRollback(File partitionDir, Schema 
schema, String fileId,
@@ -115,7 +117,7 @@ public class HoodieRealtimeRecordReaderTest {
     return writer;
   }
 
-  private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema 
schema, String fileId,
+  private HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, 
Schema schema, String fileId,
       String baseCommit, String newCommit, int numberOfRecords, int offset, 
int logVersion)
       throws InterruptedException, IOException {
     HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
@@ -137,6 +139,25 @@ public class HoodieRealtimeRecordReaderTest {
     return writer;
   }
 
+  private HoodieLogFormat.Writer writeRollbackBlockToLogFile(File 
partitionDir, Schema schema, String fileId,
+      String baseCommit, String newCommit, String oldCommit, int logVersion)
+      throws InterruptedException, IOException {
+    HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
+        .onParentPath(new Path(partitionDir.getPath()))
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
+        
.overBaseCommit(baseCommit).withLogVersion(logVersion).withFs(fs).build();
+
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+    header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, 
oldCommit);
+    header.put(HeaderMetadataType.COMMAND_BLOCK_TYPE, 
String.valueOf(HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK
+        .ordinal()));
+    HoodieCommandBlock rollbackBlock = new HoodieCommandBlock(header);
+    writer = writer.appendBlock(rollbackBlock);
+    return writer;
+  }
+
   @Test
   public void testReader() throws Exception {
     testReader(true);
@@ -155,7 +176,7 @@ public class HoodieRealtimeRecordReaderTest {
     String baseInstant = "100";
     File partitionDir =
         partitioned ? InputFormatTestUtil.prepareParquetDataset(basePath, 
schema, 1, 100, baseInstant)
-        : InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, 
schema, 1, 100, baseInstant);
+            : 
InputFormatTestUtil.prepareNonPartitionedParquetDataset(basePath, schema, 1, 
100, baseInstant);
     InputFormatTestUtil.commit(basePath, baseInstant);
     // Add the paths
     FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
@@ -183,7 +204,7 @@ public class HoodieRealtimeRecordReaderTest {
           writer = writeRollback(partitionDir, schema, "fileid0", baseInstant,
               instantTime, String.valueOf(baseInstantTs + logVersion - 1), 
logVersion);
         } else {
-          writer = writeLogFile(partitionDir, schema, "fileid0", baseInstant,
+          writer = writeDataBlockToLogFile(partitionDir, schema, "fileid0", 
baseInstant,
               instantTime, 100, 0, logVersion);
         }
         long size = writer.getCurrentSize();
@@ -199,7 +220,7 @@ public class HoodieRealtimeRecordReaderTest {
                 .collect(Collectors.toList()), instantTime);
 
         //create a RecordReader to be used by HoodieRealtimeRecordReader
-        RecordReader<Void, ArrayWritable> reader =
+        RecordReader<NullWritable, ArrayWritable> reader =
             new MapredParquetInputFormat().getRecordReader(
                 new FileSplit(split.getPath(), 0, 
fs.getLength(split.getPath()), (String[]) null),
                 jobConf, null);
@@ -219,7 +240,7 @@ public class HoodieRealtimeRecordReaderTest {
 
         //use reader to read base Parquet File and log file, merge in flight 
and return latest commit
         //here all 100 records should be updated, see above
-        Void key = recordReader.createKey();
+        NullWritable key = recordReader.createKey();
         ArrayWritable value = recordReader.createValue();
         while (recordReader.next(key, value)) {
           Writable[] values = value.get();
@@ -255,7 +276,7 @@ public class HoodieRealtimeRecordReaderTest {
 
     // insert new records to log file
     String newCommitTime = "101";
-    HoodieLogFormat.Writer writer = writeLogFile(partitionDir, schema, 
"fileid0", commitTime,
+    HoodieLogFormat.Writer writer = writeDataBlockToLogFile(partitionDir, 
schema, "fileid0", commitTime,
         newCommitTime, numRecords, numRecords, 0);
     long size = writer.getCurrentSize();
     writer.close();
@@ -268,7 +289,7 @@ public class HoodieRealtimeRecordReaderTest {
             jobConf), basePath.getRoot().getPath(), 
Arrays.asList(logFilePath), newCommitTime);
 
     //create a RecordReader to be used by HoodieRealtimeRecordReader
-    RecordReader<Void, ArrayWritable> reader =
+    RecordReader<NullWritable, ArrayWritable> reader =
         new MapredParquetInputFormat().getRecordReader(
             new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), 
(String[]) null),
             jobConf, null);
@@ -288,7 +309,7 @@ public class HoodieRealtimeRecordReaderTest {
 
     //use reader to read base Parquet File and log file
     //here all records should be present. Also ensure log records are in order.
-    Void key = recordReader.createKey();
+    NullWritable key = recordReader.createKey();
     ArrayWritable value = recordReader.createValue();
     int numRecordsAtCommit1 = 0;
     int numRecordsAtCommit2 = 0;
@@ -343,6 +364,7 @@ public class HoodieRealtimeRecordReaderTest {
     long size = writer.getCurrentSize();
     writer.close();
     assertTrue("block - size should be > 0", size > 0);
+    InputFormatTestUtil.deltaCommit(basePath, newCommitTime);
 
     //create a split with baseFile (parquet file written earlier) and new log 
file(s)
     String logFilePath = writer.getLogFile().getPath().toString();
@@ -351,7 +373,7 @@ public class HoodieRealtimeRecordReaderTest {
             jobConf), basePath.getRoot().getPath(), 
Arrays.asList(logFilePath), newCommitTime);
 
     //create a RecordReader to be used by HoodieRealtimeRecordReader
-    RecordReader<Void, ArrayWritable> reader =
+    RecordReader<NullWritable, ArrayWritable> reader =
         new MapredParquetInputFormat().getRecordReader(
             new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), 
(String[]) null),
             jobConf, null);
@@ -370,7 +392,7 @@ public class HoodieRealtimeRecordReaderTest {
 
     // use reader to read base Parquet File and log file, merge in flight and 
return latest commit
     // here the first 50 records should be updated, see above
-    Void key = recordReader.createKey();
+    NullWritable key = recordReader.createKey();
     ArrayWritable value = recordReader.createValue();
     int numRecordsRead = 0;
     while (recordReader.next(key, value)) {
@@ -420,26 +442,26 @@ public class HoodieRealtimeRecordReaderTest {
 
       // Assert type MAP
       ArrayWritable mapItem = (ArrayWritable) values[12];
-      Writable[] mapItemValues = ((ArrayWritable) mapItem.get()[0]).get();
-      ArrayWritable mapItemValue1 = (ArrayWritable) mapItemValues[0];
-      ArrayWritable mapItemValue2 = (ArrayWritable) mapItemValues[1];
-      Assert.assertEquals("test value for field: tags", 
mapItemValue1.get()[0].toString(),
+      Writable mapItemValue1 = mapItem.get()[0];
+      Writable mapItemValue2 = mapItem.get()[1];
+
+      Assert.assertEquals("test value for field: tags", ((ArrayWritable) 
mapItemValue1).get()[0].toString(),
           "mapItem1");
-      Assert.assertEquals("test value for field: tags", 
mapItemValue2.get()[0].toString(),
+      Assert.assertEquals("test value for field: tags", ((ArrayWritable) 
mapItemValue2).get()[0].toString(),
           "mapItem2");
-      ArrayWritable mapItemValue1value = (ArrayWritable) 
mapItemValue1.get()[1];
-      ArrayWritable mapItemValue2value = (ArrayWritable) 
mapItemValue2.get()[1];
-      Assert.assertEquals("test value for field: tags", 
mapItemValue1value.get().length, 2);
-      Assert.assertEquals("test value for field: tags", 
mapItemValue2value.get().length, 2);
+      Assert.assertEquals("test value for field: tags", ((ArrayWritable) 
mapItemValue1).get().length, 2);
+      Assert.assertEquals("test value for field: tags", ((ArrayWritable) 
mapItemValue2).get().length, 2);
+      Writable mapItemValue1value = ((ArrayWritable) mapItemValue1).get()[1];
+      Writable mapItemValue2value = ((ArrayWritable) mapItemValue2).get()[1];
       Assert.assertEquals("test value for field: tags[\"mapItem1\"].item1",
-          mapItemValue1value.get()[0].toString(), "item" + currentRecordNo);
+          ((ArrayWritable) mapItemValue1value).get()[0].toString(), "item" + 
currentRecordNo);
       Assert.assertEquals("test value for field: tags[\"mapItem2\"].item1",
-          mapItemValue2value.get()[0].toString(), "item2" + currentRecordNo);
+          ((ArrayWritable) mapItemValue2value).get()[0].toString(), "item2" + 
currentRecordNo);
       Assert.assertEquals("test value for field: tags[\"mapItem1\"].item2",
-          mapItemValue1value.get()[1].toString(),
+          ((ArrayWritable) mapItemValue1value).get()[1].toString(),
           "item" + currentRecordNo + recordCommitTimeSuffix);
       Assert.assertEquals("test value for field: tags[\"mapItem2\"].item2",
-          mapItemValue2value.get()[1].toString(),
+          ((ArrayWritable) mapItemValue2value).get()[1].toString(),
           "item2" + currentRecordNo + recordCommitTimeSuffix);
 
       // Assert type RECORD
@@ -453,11 +475,96 @@ public class HoodieRealtimeRecordReaderTest {
 
       // Assert type ARRAY
       ArrayWritable arrayValue = (ArrayWritable) values[14];
-      Writable[] arrayValues = ((ArrayWritable) arrayValue.get()[0]).get();
+      Writable[] arrayValues = arrayValue.get();
       for (int i = 0; i < arrayValues.length; i++) {
         Assert.assertEquals("test value for field: stringArray", "stringArray" 
+ i + recordCommitTimeSuffix,
             arrayValues[i].toString());
       }
     }
   }
-}
+
+  @Test
+  public void testSchemaEvolutionAndRollbackBlockInLastLogFile() throws 
Exception {
+    // initial commit
+    List<String> logFilePaths = new ArrayList<>();
+    Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+    HoodieTestUtils.initTableType(hadoopConf, 
basePath.getRoot().getAbsolutePath(),
+        HoodieTableType.MERGE_ON_READ);
+    String commitTime = "100";
+    int numberOfRecords = 100;
+    int numberOfLogRecords = numberOfRecords / 2;
+    File partitionDir = InputFormatTestUtil
+        .prepareSimpleParquetDataset(basePath, schema, 1, numberOfRecords, 
commitTime);
+    InputFormatTestUtil.commit(basePath, commitTime);
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+    List<Field> firstSchemaFields = schema.getFields();
+
+    // update files and generate new log file but don't commit
+    schema = SchemaTestUtil.getComplexEvolvedSchema();
+    String newCommitTime = "101";
+    HoodieLogFormat.Writer writer = writeDataBlockToLogFile(partitionDir, 
schema, "fileid0", commitTime,
+        newCommitTime, numberOfLogRecords, 0, 1);
+    long size = writer.getCurrentSize();
+    logFilePaths.add(writer.getLogFile().getPath().toString());
+    writer.close();
+    assertTrue("block - size should be > 0", size > 0);
+
+    // write rollback for the previous block in new log file version
+    newCommitTime = "102";
+    writer = writeRollbackBlockToLogFile(partitionDir, schema, "fileid0", 
commitTime,
+        newCommitTime, "101", 1);
+    logFilePaths.add(writer.getLogFile().getPath().toString());
+    writer.close();
+    assertTrue("block - size should be > 0", size > 0);
+    InputFormatTestUtil.deltaCommit(basePath, newCommitTime);
+
+    //create a split with baseFile (parquet file written earlier) and new log 
file(s)
+    HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
+        new FileSplit(new Path(partitionDir + "/fileid0_1_" + commitTime + 
".parquet"), 0, 1,
+            jobConf), basePath.getRoot().getPath(), logFilePaths, 
newCommitTime);
+
+    //create a RecordReader to be used by HoodieRealtimeRecordReader
+    RecordReader<NullWritable, ArrayWritable> reader =
+        new MapredParquetInputFormat().getRecordReader(
+            new FileSplit(split.getPath(), 0, fs.getLength(split.getPath()), 
(String[]) null),
+            jobConf, null);
+    JobConf jobConf = new JobConf();
+    List<Schema.Field> fields = schema.getFields();
+
+    assert (firstSchemaFields.containsAll(fields) == false);
+
+    // Try to read all the fields passed by the new schema
+    String names = fields.stream().map(f -> 
f.name()).collect(Collectors.joining(","));
+    String positions = fields.stream().map(f -> String.valueOf(f.pos()))
+        .collect(Collectors.joining(","));
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
+    jobConf.set("partition_columns", "datestr");
+
+    HoodieRealtimeRecordReader recordReader = null;
+    try {
+      // validate record reader compaction
+      recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
+      throw new RuntimeException("should've failed the previous line");
+    } catch (HoodieException e) {
+      // expected, field not found since the data written with the evolved 
schema was rolled back
+    }
+
+    // Try to read all the fields passed by the new schema
+    names = firstSchemaFields.stream().map(f -> 
f.name()).collect(Collectors.joining(","));
+    positions = firstSchemaFields.stream().map(f -> String.valueOf(f.pos()))
+        .collect(Collectors.joining(","));
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
+    jobConf.set("partition_columns", "datestr");
+    // This time read only the fields which are part of parquet
+    recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
+    // use reader to read base Parquet File and log file
+    NullWritable key = recordReader.createKey();
+    ArrayWritable value = recordReader.createValue();
+    while (recordReader.next(key, value)) {
+      // keep reading
+    }
+  }
+}
\ No newline at end of file
diff --git a/hoodie-hive/pom.xml b/hoodie-hive/pom.xml
index 01875a7..198b192 100644
--- a/hoodie-hive/pom.xml
+++ b/hoodie-hive/pom.xml
@@ -69,6 +69,11 @@
     </dependency>
 
     <dependency>
+      <groupId>commons-pool</groupId>
+      <artifactId>commons-pool</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
     </dependency>
@@ -107,6 +112,16 @@
       <groupId>${hive.groupid}</groupId>
       <artifactId>hive-service</artifactId>
       <version>${hive.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>${hive.groupid}</groupId>
diff --git 
a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java 
b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java
index 8d6e84f..c252007 100644
--- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java
+++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java
@@ -182,7 +182,10 @@ public class HoodieHiveClient {
 
   private List<String> constructChangePartitions(List<String> partitions) {
     List<String> changePartitions = Lists.newArrayList();
-    String alterTable = "ALTER TABLE " + syncConfig.databaseName + "." + 
syncConfig.tableName;
+    // Hive 2.x doesn't like db.table name for operations, hence we need to 
change to using the database first
+    String useDatabase = "USE " + syncConfig.databaseName;
+    changePartitions.add(useDatabase);
+    String alterTable = "ALTER TABLE " + syncConfig.tableName;
     for (String partition : partitions) {
       String partitionClause = getPartitionClause(partition);
       String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, 
partition).toString();
@@ -494,7 +497,7 @@ public class HoodieHiveClient {
     if (!hiveJdbcUrl.endsWith("/")) {
       hiveJdbcUrl = hiveJdbcUrl + "/";
     }
-    return hiveJdbcUrl + syncConfig.databaseName + (urlAppend == null ? "" : 
urlAppend);
+    return hiveJdbcUrl + (urlAppend == null ? "" : urlAppend);
   }
 
   private static void closeQuietly(ResultSet resultSet, Statement stmt) {
@@ -585,7 +588,7 @@ public class HoodieHiveClient {
     try {
       Table table = client.getTable(syncConfig.databaseName, 
syncConfig.tableName);
       table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
-      client.alter_table(syncConfig.databaseName, syncConfig.tableName, table, 
true);
+      client.alter_table(syncConfig.databaseName, syncConfig.tableName, table);
     } catch (Exception e) {
       throw new HoodieHiveSyncException(
           "Failed to get update last commit time synced to " + 
lastCommitSynced, e);
diff --git 
a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java 
b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java
index 2f2a9c8..2a7451f 100644
--- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java
+++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/util/HiveTestService.java
@@ -152,6 +152,9 @@ public class HiveTestService {
     derbyLogFile.createNewFile();
     setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
     conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, 
Files.createTempDir().getAbsolutePath());
+    conf.set("datanucleus.schema.autoCreateTables", "true");
+    conf.set("hive.metastore.schema.verification", "false");
+    setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
 
     return new HiveConf(conf, this.getClass());
   }
diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml
index e325041..4179837 100644
--- a/hoodie-utilities/pom.xml
+++ b/hoodie-utilities/pom.xml
@@ -68,6 +68,12 @@
       <groupId>io.javalin</groupId>
       <artifactId>javalin</artifactId>
       <version>2.4.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
@@ -108,6 +114,18 @@
       <groupId>com.uber.hoodie</groupId>
       <artifactId>hoodie-spark</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+      <version>7.6.0.v20120127</version>
     </dependency>
 
     <dependency>
@@ -137,16 +155,8 @@
 
     <dependency>
       <groupId>${hive.groupid}</groupId>
-      <artifactId>hive-exec</artifactId>
-      <version>${hive.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>${hive.groupid}</groupId>
       <artifactId>hive-jdbc</artifactId>
       <version>${hive.version}</version>
-      <classifier>standalone</classifier>
       <exclusions>
         <exclusion>
           <groupId>org.slf4j</groupId>
@@ -160,6 +170,19 @@
     </dependency>
 
     <dependency>
+      <groupId>${hive.groupid}</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${hive.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${hive.groupid}</groupId>
+      <artifactId>hive-service</artifactId>
+      <version>${hive.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>com.uber.hoodie</groupId>
       <artifactId>hoodie-hive</artifactId>
       <version>${project.version}</version>
@@ -232,11 +255,23 @@
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.11</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-sql_2.11</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
diff --git a/pom.xml b/pom.xml
index 525a6b9..27d1bd8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,7 +136,7 @@
     <joda.version>2.9.9</joda.version>
     <hadoop.version>2.7.3</hadoop.version>
     <hive.groupid>org.apache.hive</hive.groupid>
-    <hive.version>1.2.1</hive.version>
+    <hive.version>2.3.1</hive.version>
     <metrics.version>4.0.2</metrics.version>
     <spark.version>2.1.0</spark.version>
     <avro.version>1.7.7</avro.version>
diff --git a/release/config/license-mappings.xml 
b/release/config/license-mappings.xml
index acf5de4..8666f1a 100644
--- a/release/config/license-mappings.xml
+++ b/release/config/license-mappings.xml
@@ -25,16 +25,31 @@
         <artifactId>servlet-api</artifactId>
         <license>CDDL</license>
     </artifact>
-    <artifact>
-        <groupId>javax.servlet.jsp</groupId>
-        <artifactId>jsp-api</artifactId>
-        <license>CDDL</license>
-    </artifact>
-    <artifact>
-        <groupId>javax.transaction</groupId>
-        <artifactId>jta</artifactId>
-        <license>OWN LICENSE (See 
http://download.oracle.com/otndocs/jcp/jta-1.1-classes-oth-JSpec/jta-1.1-classes-oth-JSpec-license.html)</license>
-    </artifact>
+  <artifact>
+    <groupId>javax.servlet</groupId>
+    <artifactId>jsp-api</artifactId>
+    <license>CDDL</license>
+  </artifact>
+  <artifact>
+    <groupId>javax.xml.stream</groupId>
+    <artifactId>stax-api</artifactId>
+    <license>CDDL</license>
+  </artifact>
+  <artifact>
+      <groupId>javax.servlet.jsp</groupId>
+      <artifactId>jsp-api</artifactId>
+      <license>CDDL</license>
+  </artifact>
+  <artifact>
+      <groupId>javax.transaction</groupId>
+      <artifactId>jta</artifactId>
+      <license>OWN LICENSE (See 
http://download.oracle.com/otndocs/jcp/jta-1.1-classes-oth-JSpec/jta-1.1-classes-oth-JSpec-license.html)</license>
+  </artifact>
+  <artifact>
+    <groupId>javax.transaction</groupId>
+    <artifactId>transaction-api</artifactId>
+    <license>OWN LICENSE (See 
http://download.oracle.com/otndocs/jcp/jta-1.1-classes-oth-JSpec/jta-1.1-classes-oth-JSpec-license.html)</license>
+  </artifact>
     <artifact>
         <groupId>jdk.tools</groupId>
         <artifactId>jdk.tools</artifactId>
@@ -90,4 +105,9 @@
         <artifactId>antlr-runtime</artifactId>
         <license>BSD</license>
     </artifact>
+  <artifact>
+    <groupId>xerces</groupId>
+    <artifactId>xercesImpl</artifactId>
+    <license>Apache License, Version 1.1</license>
+  </artifact>
 </license-lookup>

Reply via email to