This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b8fe5b9  [HUDI-764] [HUDI-765] ORC reader writer Implementation (#2999)
b8fe5b9 is described below

commit b8fe5b91d599418cd908d833fd63edc7f362c548
Author: Jintao Guan <jintao.g...@uber.com>
AuthorDate: Tue Jun 15 15:21:43 2021 -0700

    [HUDI-764] [HUDI-765] ORC reader writer Implementation (#2999)
    
    
    Co-authored-by: Qingyun (Teresa) Kang <kter...@uber.com>
---
 LICENSE                                            |  12 +
 NOTICE                                             |  12 +
 .../apache/hudi/config/HoodieStorageConfig.java    |  42 ++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  17 +
 .../apache/hudi/io/storage/HoodieFileWriter.java   |  10 +
 .../hudi/io/storage/HoodieFileWriterFactory.java   |  13 +
 .../apache/hudi/io/storage/HoodieHFileWriter.java  |  10 +-
 .../apache/hudi/io/storage/HoodieOrcConfig.java    |  72 ++
 .../apache/hudi/io/storage/HoodieOrcWriter.java    | 172 +++++
 .../hudi/io/storage/HoodieParquetWriter.java       |   9 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |   1 +
 .../hudi/io/storage/TestHoodieOrcReaderWriter.java | 261 +++++++
 .../src/test/resources/exampleSchemaWithUDT.avsc   |  67 ++
 .../io/storage/TestHoodieFileWriterFactory.java    |   7 +
 hudi-common/pom.xml                                |   8 +
 .../apache/hudi/common/model/HoodieFileFormat.java |   3 +-
 .../org/apache/hudi/common/util/AvroOrcUtils.java  | 799 +++++++++++++++++++++
 .../org/apache/hudi/common/util/BaseFileUtils.java | 133 +++-
 .../apache/hudi/common/util/OrcReaderIterator.java | 118 +++
 .../java/org/apache/hudi/common/util/OrcUtils.java | 235 ++++++
 .../org/apache/hudi/common/util/ParquetUtils.java  |  60 +-
 .../hudi/io/storage/HoodieFileReaderFactory.java   |   8 +
 .../apache/hudi/io/storage/HoodieOrcReader.java    |  91 +++
 .../apache/hudi/common/util/TestAvroOrcUtils.java  |  76 ++
 .../hudi/common/util/TestOrcReaderIterator.java    |  92 +++
 .../io/storage/TestHoodieFileReaderFactory.java    |   7 +-
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  |   9 +
 .../main/scala/org/apache/hudi/DefaultSource.scala |  13 +-
 pom.xml                                            |   2 +
 29 files changed, 2268 insertions(+), 91 deletions(-)

diff --git a/LICENSE b/LICENSE
index 385191d..28222a7 100644
--- a/LICENSE
+++ b/LICENSE
@@ -333,3 +333,15 @@ Copyright (c) 2005, European Commission project OneLab 
under contract 034819 (ht
 
  Home page: https://commons.apache.org/proper/commons-lang/
  License: http://www.apache.org/licenses/LICENSE-2.0
+
+ 
-------------------------------------------------------------------------------
+
+ This product includes code from StreamSets Data Collector
+
+  * com.streamsets.pipeline.lib.util.avroorc.AvroToOrcRecordConverter copied 
and modified to org.apache.hudi.common.util.AvroOrcUtils
+  * com.streamsets.pipeline.lib.util.avroorc.AvroToOrcSchemaConverter copied 
and modified to org.apache.hudi.common.util.AvroOrcUtils
+
+  Copyright 2018 StreamSets Inc.
+
+  Home page: https://github.com/streamsets/datacollector-oss
+  License: http://www.apache.org/licenses/LICENSE-2.0
diff --git a/NOTICE b/NOTICE
index 2f1aee6..9b24933 100644
--- a/NOTICE
+++ b/NOTICE
@@ -147,3 +147,15 @@ its NOTICE file:
 
   This product includes software developed at
   The Apache Software Foundation (http://www.apache.org/).
+
+--------------------------------------------------------------------------------
+
+This product includes code from StreamSets Data Collector, which includes the 
following in
+its NOTICE file:
+
+  StreamSets datacollector-oss
+  Copyright 2018 StreamSets Inc.
+
+  This product includes software developed at
+  StreamSets (http://www.streamsets.com/).
+
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
index 50b45f3..3cd8817 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
@@ -39,10 +39,21 @@ public class HoodieStorageConfig extends 
DefaultHoodieConfig {
   public static final String DEFAULT_PARQUET_BLOCK_SIZE_BYTES = 
DEFAULT_PARQUET_FILE_MAX_BYTES;
   public static final String PARQUET_PAGE_SIZE_BYTES = 
"hoodie.parquet.page.size";
   public static final String DEFAULT_PARQUET_PAGE_SIZE_BYTES = 
String.valueOf(1 * 1024 * 1024);
+
   public static final String HFILE_FILE_MAX_BYTES = 
"hoodie.hfile.max.file.size";
   public static final String HFILE_BLOCK_SIZE_BYTES = 
"hoodie.hfile.block.size";
   public static final String DEFAULT_HFILE_BLOCK_SIZE_BYTES = String.valueOf(1 
* 1024 * 1024);
   public static final String DEFAULT_HFILE_FILE_MAX_BYTES = String.valueOf(120 
* 1024 * 1024);
+
+  public static final String ORC_FILE_MAX_BYTES = "hoodie.orc.max.file.size";
+  public static final String DEFAULT_ORC_FILE_MAX_BYTES = String.valueOf(120 * 
1024 * 1024);
+  // size of the memory buffer in bytes for writing
+  public static final String ORC_STRIPE_SIZE = "hoodie.orc.stripe.size";
+  public static final String DEFAULT_ORC_STRIPE_SIZE = String.valueOf(64 * 
1024 * 1024);
+  // file system block size
+  public static final String ORC_BLOCK_SIZE = "hoodie.orc.block.size";
+  public static final String DEFAULT_ORC_BLOCK_SIZE = 
DEFAULT_ORC_FILE_MAX_BYTES;
+
   // used to size log files
   public static final String LOGFILE_SIZE_MAX_BYTES = 
"hoodie.logfile.max.size";
   public static final String DEFAULT_LOGFILE_SIZE_MAX_BYTES = 
String.valueOf(1024 * 1024 * 1024); // 1 GB
@@ -54,9 +65,11 @@ public class HoodieStorageConfig extends DefaultHoodieConfig 
{
   public static final String DEFAULT_STREAM_COMPRESSION_RATIO = 
String.valueOf(0.1);
   public static final String PARQUET_COMPRESSION_CODEC = 
"hoodie.parquet.compression.codec";
   public static final String HFILE_COMPRESSION_ALGORITHM = 
"hoodie.hfile.compression.algorithm";
+  public static final String ORC_COMPRESSION_CODEC = 
"hoodie.orc.compression.codec";
   // Default compression codec for parquet
   public static final String DEFAULT_PARQUET_COMPRESSION_CODEC = "gzip";
   public static final String DEFAULT_HFILE_COMPRESSION_ALGORITHM = "GZ";
+  public static final String DEFAULT_ORC_COMPRESSION_CODEC = "ZLIB";
   public static final String LOGFILE_TO_PARQUET_COMPRESSION_RATIO = 
"hoodie.logfile.to.parquet.compression.ratio";
   // Default compression ratio for log file to parquet, general 3x
   public static final String DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO = 
String.valueOf(0.35);
@@ -140,6 +153,26 @@ public class HoodieStorageConfig extends 
DefaultHoodieConfig {
       return this;
     }
 
+    public Builder orcMaxFileSize(long maxFileSize) {
+      props.setProperty(ORC_FILE_MAX_BYTES, String.valueOf(maxFileSize));
+      return this;
+    }
+
+    public Builder orcStripeSize(int orcStripeSize) {
+      props.setProperty(ORC_STRIPE_SIZE, String.valueOf(orcStripeSize));
+      return this;
+    }
+
+    public Builder orcBlockSize(int orcBlockSize) {
+      props.setProperty(ORC_BLOCK_SIZE, String.valueOf(orcBlockSize));
+      return this;
+    }
+
+    public Builder orcCompressionCodec(String orcCompressionCodec) {
+      props.setProperty(ORC_COMPRESSION_CODEC, orcCompressionCodec);
+      return this;
+    }
+
     public HoodieStorageConfig build() {
       HoodieStorageConfig config = new HoodieStorageConfig(props);
       setDefaultOnCondition(props, !props.containsKey(PARQUET_FILE_MAX_BYTES), 
PARQUET_FILE_MAX_BYTES,
@@ -166,6 +199,15 @@ public class HoodieStorageConfig extends 
DefaultHoodieConfig {
       setDefaultOnCondition(props, !props.containsKey(HFILE_FILE_MAX_BYTES), 
HFILE_FILE_MAX_BYTES,
           DEFAULT_HFILE_FILE_MAX_BYTES);
 
+      setDefaultOnCondition(props, !props.containsKey(ORC_FILE_MAX_BYTES), 
ORC_FILE_MAX_BYTES,
+          DEFAULT_ORC_FILE_MAX_BYTES);
+      setDefaultOnCondition(props, !props.containsKey(ORC_STRIPE_SIZE), 
ORC_STRIPE_SIZE,
+          DEFAULT_ORC_STRIPE_SIZE);
+      setDefaultOnCondition(props, !props.containsKey(ORC_BLOCK_SIZE), 
ORC_BLOCK_SIZE,
+          DEFAULT_ORC_BLOCK_SIZE);
+      setDefaultOnCondition(props, !props.containsKey(ORC_COMPRESSION_CODEC), 
ORC_COMPRESSION_CODEC,
+          DEFAULT_ORC_COMPRESSION_CODEC);
+
       return config;
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index cf5ac5c..9e89e0e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -42,6 +42,7 @@ import org.apache.hudi.metrics.MetricsReporterType;
 import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+import org.apache.orc.CompressionKind;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
 import javax.annotation.concurrent.Immutable;
@@ -784,6 +785,22 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     return 
Compression.Algorithm.valueOf(props.getProperty(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM));
   }
 
+  public long getOrcMaxFileSize() {
+    return 
Long.parseLong(props.getProperty(HoodieStorageConfig.ORC_FILE_MAX_BYTES));
+  }
+
+  public int getOrcStripeSize() {
+    return 
Integer.parseInt(props.getProperty(HoodieStorageConfig.ORC_STRIPE_SIZE));
+  }
+
+  public int getOrcBlockSize() {
+    return 
Integer.parseInt(props.getProperty(HoodieStorageConfig.ORC_BLOCK_SIZE));
+  }
+
+  public CompressionKind getOrcCompressionCodec() {
+    return 
CompressionKind.valueOf(props.getProperty(HoodieStorageConfig.ORC_COMPRESSION_CODEC));
+  }
+
   /**
    * metrics properties.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
index 1aaa389..a579234 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java
@@ -18,6 +18,9 @@
 
 package org.apache.hudi.io.storage;
 
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieRecord;
 
 import org.apache.avro.generic.IndexedRecord;
@@ -35,4 +38,11 @@ public interface HoodieFileWriter<R extends IndexedRecord> {
   void writeAvro(String key, R oldRecord) throws IOException;
 
   long getBytesWritten();
+
+  default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, 
String instantTime, Integer partitionId, AtomicLong recordIndex, String 
fileName) {
+    String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, 
recordIndex.getAndIncrement());
+    HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, 
record.getRecordKey(), record.getPartitionPath(), fileName);
+    HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, 
instantTime, seqId);
+    return;
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index 23701b0..96f19ca 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -34,6 +34,7 @@ import org.apache.parquet.avro.AvroSchemaConverter;
 
 import java.io.IOException;
 
+import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
 
@@ -49,6 +50,9 @@ public class HoodieFileWriterFactory {
     if (HFILE.getFileExtension().equals(extension)) {
       return newHFileFileWriter(instantTime, path, config, schema, 
hoodieTable, taskContextSupplier);
     }
+    if (ORC.getFileExtension().equals(extension)) {
+      return newOrcFileWriter(instantTime, path, config, schema, hoodieTable, 
taskContextSupplier);
+    }
     throw new UnsupportedOperationException(extension + " format not supported 
yet.");
   }
 
@@ -77,6 +81,15 @@ public class HoodieFileWriterFactory {
     return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, 
taskContextSupplier);
   }
 
+  private static <T extends HoodieRecordPayload, R extends IndexedRecord> 
HoodieFileWriter<R> newOrcFileWriter(
+      String instantTime, Path path, HoodieWriteConfig config, Schema schema, 
HoodieTable hoodieTable,
+      TaskContextSupplier taskContextSupplier) throws IOException {
+    BloomFilter filter = createBloomFilter(config);
+    HoodieOrcConfig orcConfig = new 
HoodieOrcConfig(hoodieTable.getHadoopConf(), config.getOrcCompressionCodec(),
+        config.getOrcStripeSize(), config.getOrcBlockSize(), 
config.getOrcMaxFileSize(), filter);
+    return new HoodieOrcWriter<>(instantTime, path, orcConfig, schema, 
taskContextSupplier);
+  }
+
   private static BloomFilter createBloomFilter(HoodieWriteConfig config) {
     return 
BloomFilterFactory.createBloomFilter(config.getBloomFilterNumEntries(), 
config.getBloomFilterFPP(),
             config.getDynamicBloomFilterMaxNumEntries(),
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
index 352c51c..6747c4a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
@@ -99,13 +99,9 @@ public class HoodieHFileWriter<T extends 
HoodieRecordPayload, R extends IndexedR
 
   @Override
   public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws 
IOException {
-    String seqId =
-        HoodieRecord.generateSequenceId(instantTime, 
taskContextSupplier.getPartitionIdSupplier().get(), 
recordIndex.getAndIncrement());
-    HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, 
record.getRecordKey(), record.getPartitionPath(),
-        file.getName());
-    HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, 
instantTime, seqId);
-
-    writeAvro(record.getRecordKey(), (IndexedRecord)avroRecord);
+    prepRecordWithMetadata(avroRecord, record, instantTime,
+        taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, 
file.getName());
+    writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java
new file mode 100644
index 0000000..c45e024
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.orc.CompressionKind;
+
+public class HoodieOrcConfig {
+  static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema";
+
+  private final CompressionKind compressionKind;
+  private final int stripeSize;
+  private final int blockSize;
+  private final long maxFileSize;
+  private final Configuration hadoopConf;
+  private final BloomFilter bloomFilter;
+
+  public HoodieOrcConfig(Configuration hadoopConf, CompressionKind 
compressionKind, int stripeSize,
+      int blockSize, long maxFileSize, BloomFilter bloomFilter) {
+    this.hadoopConf = hadoopConf;
+    this.compressionKind = compressionKind;
+    this.stripeSize = stripeSize;
+    this.blockSize = blockSize;
+    this.maxFileSize = maxFileSize;
+    this.bloomFilter = bloomFilter;
+  }
+
+  public Configuration getHadoopConf() {
+    return hadoopConf;
+  }
+
+  public CompressionKind getCompressionKind() {
+    return compressionKind;
+  }
+
+  public int getStripeSize() {
+    return stripeSize;
+  }
+
+  public int getBlockSize() {
+    return blockSize;
+  }
+
+  public long getMaxFileSize() {
+    return maxFileSize;
+  }
+
+  public boolean useBloomFilter() {
+    return bloomFilter != null;
+  }
+
+  public BloomFilter getBloomFilter() {
+    return bloomFilter;
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java
new file mode 100644
index 0000000..f076842
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.AvroOrcUtils;
+
+public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends 
IndexedRecord>
+    implements HoodieFileWriter<R> {
+  private static final AtomicLong RECORD_INDEX = new AtomicLong(1);
+
+  private final long maxFileSize;
+  private final Schema avroSchema;
+  private final List<TypeDescription> fieldTypes;
+  private final List<String> fieldNames;
+  private final VectorizedRowBatch batch;
+  private final Writer writer;
+
+  private final Path file;
+  private final HoodieWrapperFileSystem fs;
+  private final String instantTime;
+  private final TaskContextSupplier taskContextSupplier;
+
+  private HoodieOrcConfig orcConfig;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieOrcWriter(String instantTime, Path file, HoodieOrcConfig 
config, Schema schema,
+      TaskContextSupplier taskContextSupplier) throws IOException {
+
+    Configuration conf = FSUtils.registerFileSystem(file, 
config.getHadoopConf());
+    this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
+    this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
+    this.instantTime = instantTime;
+    this.taskContextSupplier = taskContextSupplier;
+
+    this.avroSchema = schema;
+    final TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(avroSchema);
+    this.fieldTypes = orcSchema.getChildren();
+    this.fieldNames = orcSchema.getFieldNames();
+    this.maxFileSize = config.getMaxFileSize();
+    this.batch = orcSchema.createRowBatch();
+    OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf)
+        .blockSize(config.getBlockSize())
+        .stripeSize(config.getStripeSize())
+        .compress(config.getCompressionKind())
+        .bufferSize(config.getBlockSize())
+        .fileSystem(fs)
+        .setSchema(orcSchema);
+    this.writer = OrcFile.createWriter(this.file, writerOptions);
+    this.orcConfig = config;
+  }
+
+  @Override
+  public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws 
IOException {
+    prepRecordWithMetadata(avroRecord, record, instantTime,
+        taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX, 
file.getName());
+    writeAvro(record.getRecordKey(), avroRecord);
+  }
+
+  @Override
+  public boolean canWrite() {
+    return fs.getBytesWritten(file) < maxFileSize;
+  }
+
+  @Override
+  public void writeAvro(String recordKey, IndexedRecord object) throws 
IOException {
+    for (int col = 0; col < batch.numCols; col++) {
+      ColumnVector colVector = batch.cols[col];
+      final String thisField = fieldNames.get(col);
+      final TypeDescription type = fieldTypes.get(col);
+
+      Object fieldValue = ((GenericRecord) object).get(thisField);
+      Schema.Field avroField = avroSchema.getField(thisField);
+      AvroOrcUtils.addToVector(type, colVector, avroField.schema(), 
fieldValue, batch.size);
+    }
+
+    batch.size++;
+
+    // Batch size corresponds to the number of written rows out of 1024 total 
rows (by default)
+    // in the row batch, add the batch to file once all rows are filled and 
reset.
+    if (batch.size == batch.getMaxSize()) {
+      writer.addRowBatch(batch);
+      batch.reset();
+      batch.size = 0;
+    }
+
+    if (orcConfig.useBloomFilter()) {
+      orcConfig.getBloomFilter().add(recordKey);
+      if (minRecordKey != null) {
+        minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : 
recordKey;
+      } else {
+        minRecordKey = recordKey;
+      }
+
+      if (maxRecordKey != null) {
+        maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : 
recordKey;
+      } else {
+        maxRecordKey = recordKey;
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (batch.size != 0) {
+      writer.addRowBatch(batch);
+      batch.reset();
+    }
+
+    if (orcConfig.useBloomFilter()) {
+      final BloomFilter bloomFilter = orcConfig.getBloomFilter();
+      writer.addUserMetadata(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, 
ByteBuffer.wrap(bloomFilter.serializeToString().getBytes()));
+      if (minRecordKey != null && maxRecordKey != null) {
+        writer.addUserMetadata(HOODIE_MIN_RECORD_KEY_FOOTER, 
ByteBuffer.wrap(minRecordKey.getBytes()));
+        writer.addUserMetadata(HOODIE_MAX_RECORD_KEY_FOOTER, 
ByteBuffer.wrap(maxRecordKey.getBytes()));
+      }
+      if 
(bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX))
 {
+        writer.addUserMetadata(HOODIE_BLOOM_FILTER_TYPE_CODE, 
ByteBuffer.wrap(bloomFilter.getBloomFilterTypeCode().name().getBytes()));
+      }
+    }
+    writer.addUserMetadata(HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY, 
ByteBuffer.wrap(avroSchema.toString().getBytes()));
+
+    writer.close();
+  }
+
+  @Override
+  public long getBytesWritten() {
+    return fs.getBytesWritten(file);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
index c3939d7..b6e77bc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.io.storage;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
@@ -27,7 +26,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetFileWriter;
@@ -75,11 +73,8 @@ public class HoodieParquetWriter<T extends 
HoodieRecordPayload, R extends Indexe
 
   @Override
   public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws 
IOException {
-    String seqId =
-        HoodieRecord.generateSequenceId(instantTime, 
taskContextSupplier.getPartitionIdSupplier().get(), 
recordIndex.getAndIncrement());
-    HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, 
record.getRecordKey(), record.getPartitionPath(),
-        file.getName());
-    HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, 
instantTime, seqId);
+    prepRecordWithMetadata(avroRecord, record, instantTime,
+        taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, 
file.getName());
     super.write(avroRecord);
     writeSupport.add(record.getRecordKey());
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 512518c..0ff2093 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -656,6 +656,7 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload, I, K, O> implem
   public HoodieLogBlockType getLogDataBlockFormat() {
     switch (getBaseFileFormat()) {
       case PARQUET:
+      case ORC:
         return HoodieLogBlockType.AVRO_DATA_BLOCK;
       case HFILE:
         return HoodieLogBlockType.HFILE_DATA_BLOCK;
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
new file mode 100644
index 0000000..d69bc70
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static 
org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static 
org.apache.hudi.io.storage.HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieOrcReaderWriter {
+  private final Path filePath = new Path(System.getProperty("java.io.tmpdir") 
+ "/f1_1-0-1_000.orc");
+
+  @BeforeEach
+  @AfterEach
+  public void clearTempFile() {
+    File file = new File(filePath.toString());
+    if (file.exists()) {
+      file.delete();
+    }
+  }
+
+  private HoodieOrcWriter createOrcWriter(Schema avroSchema) throws Exception {
+    BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, 
-1, BloomFilterTypeCode.SIMPLE.name());
+    Configuration conf = new Configuration();
+    int orcStripSize = 
Integer.parseInt(HoodieStorageConfig.DEFAULT_ORC_STRIPE_SIZE);
+    int orcBlockSize = 
Integer.parseInt(HoodieStorageConfig.DEFAULT_ORC_BLOCK_SIZE);
+    int maxFileSize = 
Integer.parseInt(HoodieStorageConfig.DEFAULT_ORC_FILE_MAX_BYTES);
+    HoodieOrcConfig config = new HoodieOrcConfig(conf, CompressionKind.ZLIB, 
orcStripSize, orcBlockSize, maxFileSize, filter);
+    TaskContextSupplier mockTaskContextSupplier = 
Mockito.mock(TaskContextSupplier.class);
+    String instantTime = "000";
+    return new HoodieOrcWriter(instantTime, filePath, config, avroSchema, 
mockTaskContextSupplier);
+  }
+
+  @Test
+  public void testWriteReadMetadata() throws Exception {
+    Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, 
"/exampleSchema.avsc");
+    HoodieOrcWriter writer = createOrcWriter(avroSchema);
+    for (int i = 0; i < 3; i++) {
+      GenericRecord record = new GenericData.Record(avroSchema);
+      record.put("_row_key", "key" + i);
+      record.put("time", Integer.toString(i));
+      record.put("number", i);
+      writer.writeAvro("key" + i, record);
+    }
+    writer.close();
+
+    Configuration conf = new Configuration();
+    Reader orcReader = OrcFile.createReader(filePath, 
OrcFile.readerOptions(conf));
+    assertEquals(4, orcReader.getMetadataKeys().size());
+    
assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MIN_RECORD_KEY_FOOTER));
+    
assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MAX_RECORD_KEY_FOOTER));
+    
assertTrue(orcReader.getMetadataKeys().contains(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY));
+    assertTrue(orcReader.getMetadataKeys().contains(AVRO_SCHEMA_METADATA_KEY));
+    assertEquals(CompressionKind.ZLIB.name(), 
orcReader.getCompressionKind().toString());
+
+    HoodieFileReader<GenericRecord> hoodieReader = 
HoodieFileReaderFactory.getFileReader(conf, filePath);
+    BloomFilter filter = hoodieReader.readBloomFilter();
+    for (int i = 0; i < 3; i++) {
+      assertTrue(filter.mightContain("key" + i));
+    }
+    assertFalse(filter.mightContain("non-existent-key"));
+    assertEquals(3, hoodieReader.getTotalRecords());
+    String[] minMaxRecordKeys = hoodieReader.readMinMaxRecordKeys();
+    assertEquals(2, minMaxRecordKeys.length);
+    assertEquals("key0", minMaxRecordKeys[0]);
+    assertEquals("key2", minMaxRecordKeys[1]);
+  }
+
+  @Test
+  public void testWriteReadPrimitiveRecord() throws Exception {
+    Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, 
"/exampleSchema.avsc");
+    HoodieOrcWriter writer = createOrcWriter(avroSchema);
+    for (int i = 0; i < 3; i++) {
+      GenericRecord record = new GenericData.Record(avroSchema);
+      record.put("_row_key", "key" + i);
+      record.put("time", Integer.toString(i));
+      record.put("number", i);
+      writer.writeAvro("key" + i, record);
+    }
+    writer.close();
+
+    Configuration conf = new Configuration();
+    Reader orcReader = OrcFile.createReader(filePath, 
OrcFile.readerOptions(conf));
+    assertEquals("struct<_row_key:string,time:string,number:int>", 
orcReader.getSchema().toString());
+    assertEquals(3, orcReader.getNumberOfRows());
+
+    HoodieFileReader<GenericRecord> hoodieReader = 
HoodieFileReaderFactory.getFileReader(conf, filePath);
+    Iterator<GenericRecord> iter = hoodieReader.getRecordIterator();
+    int index = 0;
+    while (iter.hasNext()) {
+      GenericRecord record = iter.next();
+      assertEquals("key" + index, record.get("_row_key").toString());
+      assertEquals(Integer.toString(index), record.get("time").toString());
+      assertEquals(index, record.get("number"));
+      index++;
+    }
+  }
+
+  @Test
+  public void testWriteReadComplexRecord() throws Exception {
+    Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, 
"/exampleSchemaWithUDT.avsc");
+    Schema udtSchema = 
avroSchema.getField("driver").schema().getTypes().get(1);
+    HoodieOrcWriter writer = createOrcWriter(avroSchema);
+    for (int i = 0; i < 3; i++) {
+      GenericRecord record = new GenericData.Record(avroSchema);
+      record.put("_row_key", "key" + i);
+      record.put("time", Integer.toString(i));
+      record.put("number", i);
+      GenericRecord innerRecord = new GenericData.Record(udtSchema);
+      innerRecord.put("driver_name", "driver" + i);
+      innerRecord.put("list", Collections.singletonList(i));
+      innerRecord.put("map", Collections.singletonMap("key" + i, "value" + i));
+      record.put("driver", innerRecord);
+      writer.writeAvro("key" + i, record);
+    }
+    writer.close();
+
+    Configuration conf = new Configuration();
+    Reader reader = OrcFile.createReader(filePath, 
OrcFile.readerOptions(conf));
+    
assertEquals("struct<_row_key:string,time:string,number:int,driver:struct<driver_name:string,list:array<int>,map:map<string,string>>>",
+        reader.getSchema().toString());
+    assertEquals(3, reader.getNumberOfRows());
+
+    HoodieFileReader<GenericRecord> hoodieReader = 
HoodieFileReaderFactory.getFileReader(conf, filePath);
+    Iterator<GenericRecord> iter = hoodieReader.getRecordIterator();
+    int index = 0;
+    while (iter.hasNext()) {
+      GenericRecord record = iter.next();
+      assertEquals("key" + index, record.get("_row_key").toString());
+      assertEquals(Integer.toString(index), record.get("time").toString());
+      assertEquals(index, record.get("number"));
+      GenericRecord innerRecord = (GenericRecord) record.get("driver");
+      assertEquals("driver" + index, 
innerRecord.get("driver_name").toString());
+      assertEquals(1, ((List<?>)innerRecord.get("list")).size());
+      assertEquals(index, ((List<?>)innerRecord.get("list")).get(0));
+      assertEquals("value" + index, 
((Map<?,?>)innerRecord.get("map")).get("key" + index).toString());
+      index++;
+    }
+  }
+
+  @Test
+  public void testWriteReadWithEvolvedSchema() throws Exception {
+    Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, 
"/exampleSchema.avsc");
+    HoodieOrcWriter writer = createOrcWriter(avroSchema);
+    for (int i = 0; i < 3; i++) {
+      GenericRecord record = new GenericData.Record(avroSchema);
+      record.put("_row_key", "key" + i);
+      record.put("time", Integer.toString(i));
+      record.put("number", i);
+      writer.writeAvro("key" + i, record);
+    }
+    writer.close();
+
+    Configuration conf = new Configuration();
+    HoodieFileReader<GenericRecord> hoodieReader = 
HoodieFileReaderFactory.getFileReader(conf, filePath);
+    Schema evolvedSchema = 
getSchemaFromResource(TestHoodieOrcReaderWriter.class, 
"/exampleEvolvedSchema.avsc");
+    Iterator<GenericRecord> iter = 
hoodieReader.getRecordIterator(evolvedSchema);
+    int index = 0;
+    while (iter.hasNext()) {
+      GenericRecord record = iter.next();
+      assertEquals("key" + index, record.get("_row_key").toString());
+      assertEquals(Integer.toString(index), record.get("time").toString());
+      assertEquals(index, record.get("number"));
+      assertNull(record.get("added_field"));
+      index++;
+    }
+
+    evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, 
"/exampleEvolvedSchemaChangeOrder.avsc");
+    iter = hoodieReader.getRecordIterator(evolvedSchema);
+    index = 0;
+    while (iter.hasNext()) {
+      GenericRecord record = iter.next();
+      assertEquals("key" + index, record.get("_row_key").toString());
+      assertEquals(Integer.toString(index), record.get("time").toString());
+      assertEquals(index, record.get("number"));
+      assertNull(record.get("added_field"));
+      index++;
+    }
+
+    evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, 
"/exampleEvolvedSchemaColumnRequire.avsc");
+    iter = hoodieReader.getRecordIterator(evolvedSchema);
+    index = 0;
+    while (iter.hasNext()) {
+      GenericRecord record = iter.next();
+      assertEquals("key" + index, record.get("_row_key").toString());
+      assertEquals(Integer.toString(index), record.get("time").toString());
+      assertEquals(index, record.get("number"));
+      assertNull(record.get("added_field"));
+      index++;
+    }
+
+    evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, 
"/exampleEvolvedSchemaColumnType.avsc");
+    iter = hoodieReader.getRecordIterator(evolvedSchema);
+    index = 0;
+    while (iter.hasNext()) {
+      GenericRecord record = iter.next();
+      assertEquals("key" + index, record.get("_row_key").toString());
+      assertEquals(Integer.toString(index), record.get("time").toString());
+      assertEquals(Integer.toString(index), record.get("number").toString());
+      assertNull(record.get("added_field"));
+      index++;
+    }
+
+    evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, 
"/exampleEvolvedSchemaDeleteColumn.avsc");
+    iter = hoodieReader.getRecordIterator(evolvedSchema);
+    index = 0;
+    while (iter.hasNext()) {
+      GenericRecord record = iter.next();
+      assertEquals("key" + index, record.get("_row_key").toString());
+      assertEquals(Integer.toString(index), record.get("time").toString());
+      assertNull(record.get("number"));
+      assertNull(record.get("added_field"));
+      index++;
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithUDT.avsc 
b/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithUDT.avsc
new file mode 100644
index 0000000..4c40fb2
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithUDT.avsc
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+    "namespace": "example.schema",
+    "type": "record",
+    "name": "trip",
+    "fields": [
+        {
+            "name": "_row_key",
+            "type": "string"
+        },
+        {
+            "name": "time",
+            "type": "string"
+        },
+        {
+            "name": "number",
+            "type": ["null", "int"]
+        },
+        {
+            "name": "driver",
+            "type": [
+              "null",
+              {
+                "name": "person",
+                "type": "record",
+                "fields": [
+                    {
+                        "default": null,
+                        "name": "driver_name",
+                        "type": ["null", "string"]
+                    },
+                    {
+                        "name": "list",
+                        "type": {
+                            "type": "array",
+                            "items": "int"
+                        }
+                    },
+                    {
+                        "name": "map",
+                        "type": {
+                            "type": "map",
+                            "values": "string"
+                        }
+                    }
+                ]
+              }
+            ]
+        }
+    ]
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
index 26f431a..b7f34ab 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java
@@ -51,11 +51,18 @@ public class TestHoodieFileWriterFactory extends 
HoodieClientTestBase {
         parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, 
supplier);
     assertTrue(parquetWriter instanceof HoodieParquetWriter);
 
+    // hfile format.
     final Path hfilePath = new Path(basePath + 
"/partition/path/f1_1-0-1_000.hfile");
     HoodieFileWriter<IndexedRecord> hfileWriter = 
HoodieFileWriterFactory.getFileWriter(instantTime,
         hfilePath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
     assertTrue(hfileWriter instanceof HoodieHFileWriter);
 
+    // orc file format.
+    final Path orcPath = new Path(basePath + 
"/partition/path/f1_1-0-1_000.orc");
+    HoodieFileWriter<IndexedRecord> orcFileWriter = 
HoodieFileWriterFactory.getFileWriter(instantTime,
+        orcPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
+    assertTrue(orcFileWriter instanceof HoodieOrcWriter);
+
     // other file format exception.
     final Path logPath = new Path(basePath + 
"/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
     final Throwable thrown = assertThrows(UnsupportedOperationException.class, 
() -> {
diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index 6ec0d95..a41e73e 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -119,6 +119,14 @@
       <artifactId>parquet-avro</artifactId>
     </dependency>
 
+    <!-- ORC -->
+    <dependency>
+      <groupId>org.apache.orc</groupId>
+      <artifactId>orc-core</artifactId>
+      <version>${orc.version}</version>
+      <classifier>nohive</classifier>
+    </dependency>
+
     <!-- Httpcomponents -->
     <dependency>
       <groupId>org.apache.httpcomponents</groupId>
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
index 552c38f..f7fdcd0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
@@ -24,7 +24,8 @@ package org.apache.hudi.common.model;
 public enum HoodieFileFormat {
   PARQUET(".parquet"),
   HOODIE_LOG(".log"),
-  HFILE(".hfile");
+  HFILE(".hfile"),
+  ORC(".orc");
 
   private final String extension;
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java
new file mode 100644
index 0000000..0f1f49f
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java
@@ -0,0 +1,799 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.Base64;
+import java.util.Date;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData;
+import java.nio.charset.StandardCharsets;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.StringType;
+import org.apache.avro.util.Utf8;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
+import org.apache.orc.storage.serde2.io.DateWritable;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.orc.TypeDescription;
+
+/**
+ * Methods including addToVector, addUnionValue, createOrcSchema are 
originally from
+ * https://github.com/streamsets/datacollector.
+ * Source classes:
+ * - com.streamsets.pipeline.lib.util.avroorc.AvroToOrcRecordConverter
+ * - com.streamsets.pipeline.lib.util.avroorc.AvroToOrcSchemaConverter
+ *
+ * Changes made:
+ * 1. Flatten nullable Avro schema type when the value is not null in 
`addToVector`.
+ * 2. Use getLogicalType(), constants from LogicalTypes instead of 
getJsonProp() to handle Avro logical types.
+ */
+public class AvroOrcUtils {
+
+  private static final int MICROS_PER_MILLI = 1000;
+  private static final int NANOS_PER_MICRO = 1000;
+
+  /**
+   * Add an object (of a given ORC type) to the column vector at a given 
position.
+   *
+   * @param type        ORC schema of the value Object.
+   * @param colVector   The column vector to store the value Object.
+   * @param avroSchema  Avro schema of the value Object.
+   *                    Only used to check logical types for timestamp unit 
conversion.
+   * @param value       Object to be added to the column vector
+   * @param vectorPos   The position in the vector where value will be stored 
at.
+   */
+  public static void addToVector(TypeDescription type, ColumnVector colVector, 
Schema avroSchema, Object value, int vectorPos) {
+
+    final int currentVecLength = colVector.isNull.length;
+    if (vectorPos >= currentVecLength) {
+      colVector.ensureSize(2 * currentVecLength, true);
+    }
+    if (value == null) {
+      colVector.isNull[vectorPos] = true;
+      colVector.noNulls = false;
+      return;
+    }
+
+    if (avroSchema.getType().equals(Schema.Type.UNION)) {
+      avroSchema = getActualSchemaType(avroSchema);
+    }
+
+    LogicalType logicalType = avroSchema != null ? avroSchema.getLogicalType() 
: null;
+
+    switch (type.getCategory()) {
+      case BOOLEAN:
+        LongColumnVector boolVec = (LongColumnVector) colVector;
+        boolVec.vector[vectorPos] = (boolean) value ? 1 : 0;
+        break;
+      case BYTE:
+        LongColumnVector byteColVec = (LongColumnVector) colVector;
+        byteColVec.vector[vectorPos] = (byte) value;
+        break;
+      case SHORT:
+        LongColumnVector shortColVec = (LongColumnVector) colVector;
+        shortColVec.vector[vectorPos] = (short) value;
+        break;
+      case INT:
+        // the Avro logical type could be 
AvroTypeUtil.LOGICAL_TYPE_TIME_MILLIS, but we will ignore that fact here
+        // since Orc has no way to represent a time in the way Avro defines 
it; we will simply preserve the int value
+        LongColumnVector intColVec = (LongColumnVector) colVector;
+        intColVec.vector[vectorPos] = (int) value;
+        break;
+      case LONG:
+        // the Avro logical type could be 
AvroTypeUtil.LOGICAL_TYPE_TIME_MICROS, but we will ignore that fact here
+        // since Orc has no way to represent a time in the way Avro defines 
it; we will simply preserve the long value
+        LongColumnVector longColVec = (LongColumnVector) colVector;
+        longColVec.vector[vectorPos] = (long) value;
+        break;
+      case FLOAT:
+        DoubleColumnVector floatColVec = (DoubleColumnVector) colVector;
+        floatColVec.vector[vectorPos] = (float) value;
+        break;
+      case DOUBLE:
+        DoubleColumnVector doubleColVec = (DoubleColumnVector) colVector;
+        doubleColVec.vector[vectorPos] = (double) value;
+        break;
+      case VARCHAR:
+      case CHAR:
+      case STRING:
+        BytesColumnVector bytesColVec = (BytesColumnVector) colVector;
+        byte[] bytes = null;
+
+        if (value instanceof String) {
+          bytes = ((String) value).getBytes(StandardCharsets.UTF_8);
+        } else if (value instanceof Utf8) {
+          final Utf8 utf8 = (Utf8) value;
+          bytes = utf8.getBytes();
+        } else if (value instanceof GenericData.EnumSymbol) {
+          bytes = ((GenericData.EnumSymbol) 
value).toString().getBytes(StandardCharsets.UTF_8);
+        } else {
+          throw new IllegalStateException(String.format(
+              "Unrecognized type for Avro %s field value, which has type %s, 
value %s",
+              type.getCategory().getName(),
+              value.getClass().getName(),
+              value.toString()
+          ));
+        }
+
+        if (bytes == null) {
+          bytesColVec.isNull[vectorPos] = true;
+          bytesColVec.noNulls = false;
+        } else {
+          bytesColVec.setRef(vectorPos, bytes, 0, bytes.length);
+        }
+        break;
+      case DATE:
+        LongColumnVector dateColVec = (LongColumnVector) colVector;
+        int daysSinceEpoch;
+        if (logicalType instanceof LogicalTypes.Date) {
+          daysSinceEpoch = (int) value;
+        } else if (value instanceof java.sql.Date) {
+          daysSinceEpoch = DateWritable.dateToDays((java.sql.Date) value);
+        } else if (value instanceof Date) {
+          daysSinceEpoch = DateWritable.millisToDays(((Date) value).getTime());
+        } else {
+          throw new IllegalStateException(String.format(
+              "Unrecognized type for Avro DATE field value, which has type %s, 
value %s",
+              value.getClass().getName(),
+              value.toString()
+          ));
+        }
+        dateColVec.vector[vectorPos] = daysSinceEpoch;
+        break;
+      case TIMESTAMP:
+        TimestampColumnVector tsColVec = (TimestampColumnVector) colVector;
+
+        long time;
+        int nanos = 0;
+
+        // The unit for Timestamp in ORC is millis, convert timestamp to 
millis if needed
+        if (logicalType instanceof LogicalTypes.TimestampMillis) {
+          time = (long) value;
+        } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+          final long logicalTsValue = (long) value;
+          time = logicalTsValue / MICROS_PER_MILLI;
+          nanos = NANOS_PER_MICRO * ((int) (logicalTsValue % 
MICROS_PER_MILLI));
+        } else if (value instanceof Timestamp) {
+          Timestamp tsValue = (Timestamp) value;
+          time = tsValue.getTime();
+          nanos = tsValue.getNanos();
+        } else if (value instanceof java.sql.Date) {
+          java.sql.Date sqlDateValue = (java.sql.Date) value;
+          time = sqlDateValue.getTime();
+        } else if (value instanceof Date) {
+          Date dateValue = (Date) value;
+          time = dateValue.getTime();
+        } else {
+          throw new IllegalStateException(String.format(
+              "Unrecognized type for Avro TIMESTAMP field value, which has 
type %s, value %s",
+              value.getClass().getName(),
+              value.toString()
+          ));
+        }
+
+        tsColVec.time[vectorPos] = time;
+        tsColVec.nanos[vectorPos] = nanos;
+        break;
+      case BINARY:
+        BytesColumnVector binaryColVec = (BytesColumnVector) colVector;
+
+        byte[] binaryBytes;
+        if (value instanceof GenericData.Fixed) {
+          binaryBytes = ((GenericData.Fixed)value).bytes();
+        } else if (value instanceof ByteBuffer) {
+          final ByteBuffer byteBuffer = (ByteBuffer) value;
+          binaryBytes = new byte[byteBuffer.remaining()];
+          byteBuffer.get(binaryBytes);
+        } else if (value instanceof byte[]) {
+          binaryBytes = (byte[]) value;
+        } else {
+          throw new IllegalStateException(String.format(
+              "Unrecognized type for Avro BINARY field value, which has type 
%s, value %s",
+              value.getClass().getName(),
+              value.toString()
+          ));
+        }
+        binaryColVec.setRef(vectorPos, binaryBytes, 0, binaryBytes.length);
+        break;
+      case DECIMAL:
+        DecimalColumnVector decimalColVec = (DecimalColumnVector) colVector;
+        HiveDecimal decimalValue;
+        if (value instanceof BigDecimal) {
+          final BigDecimal decimal = (BigDecimal) value;
+          decimalValue = HiveDecimal.create(decimal);
+        } else if (value instanceof ByteBuffer) {
+          final ByteBuffer byteBuffer = (ByteBuffer) value;
+          final byte[] decimalBytes = new byte[byteBuffer.remaining()];
+          byteBuffer.get(decimalBytes);
+          final BigInteger bigInt = new BigInteger(decimalBytes);
+          final int scale = type.getScale();
+          BigDecimal bigDecVal = new BigDecimal(bigInt, scale);
+
+          decimalValue = HiveDecimal.create(bigDecVal);
+          if (decimalValue == null && decimalBytes.length > 0) {
+            throw new IllegalStateException(
+                "Unexpected read null HiveDecimal from bytes (base-64 
encoded): "
+                    + Base64.getEncoder().encodeToString(decimalBytes)
+            );
+          }
+        } else if (value instanceof GenericData.Fixed) {
+          final BigDecimal decimal = new Conversions.DecimalConversion()
+              .fromFixed((GenericData.Fixed) value, avroSchema, logicalType);
+          decimalValue = HiveDecimal.create(decimal);
+        } else {
+          throw new IllegalStateException(String.format(
+              "Unexpected type for decimal (%s), cannot convert from Avro 
value",
+              value.getClass().getCanonicalName()
+          ));
+        }
+        if (decimalValue == null) {
+          decimalColVec.isNull[vectorPos] = true;
+          decimalColVec.noNulls = false;
+        } else {
+          decimalColVec.set(vectorPos, decimalValue);
+        }
+        break;
+      case LIST:
+        List<?> list = (List<?>) value;
+        ListColumnVector listColVec = (ListColumnVector) colVector;
+        listColVec.offsets[vectorPos] = listColVec.childCount;
+        listColVec.lengths[vectorPos] = list.size();
+
+        TypeDescription listType = type.getChildren().get(0);
+        for (Object listItem : list) {
+          addToVector(listType, listColVec.child, avroSchema.getElementType(), 
listItem, listColVec.childCount++);
+        }
+        break;
+      case MAP:
+        Map<String, ?> mapValue = (Map<String, ?>) value;
+
+        MapColumnVector mapColumnVector = (MapColumnVector) colVector;
+        mapColumnVector.offsets[vectorPos] = mapColumnVector.childCount;
+        mapColumnVector.lengths[vectorPos] = mapValue.size();
+
+        // keys are always strings
+        Schema keySchema = Schema.create(Schema.Type.STRING);
+        for (Map.Entry<String, ?> entry : mapValue.entrySet()) {
+          addToVector(
+              type.getChildren().get(0),
+              mapColumnVector.keys,
+              keySchema,
+              entry.getKey(),
+              mapColumnVector.childCount
+          );
+
+          addToVector(
+              type.getChildren().get(1),
+              mapColumnVector.values,
+              avroSchema.getValueType(),
+              entry.getValue(),
+              mapColumnVector.childCount
+          );
+
+          mapColumnVector.childCount++;
+        }
+
+        break;
+      case STRUCT:
+        StructColumnVector structColVec = (StructColumnVector) colVector;
+
+        GenericData.Record record = (GenericData.Record) value;
+
+        for (int i = 0; i < type.getFieldNames().size(); i++) {
+          String fieldName = type.getFieldNames().get(i);
+          Object fieldValue = record.get(fieldName);
+          TypeDescription fieldType = type.getChildren().get(i);
+          addToVector(fieldType, structColVec.fields[i], 
avroSchema.getFields().get(i).schema(), fieldValue, vectorPos);
+        }
+
+        break;
+      case UNION:
+        UnionColumnVector unionColVec = (UnionColumnVector) colVector;
+
+        List<TypeDescription> childTypes = type.getChildren();
+        boolean added = addUnionValue(unionColVec, childTypes, avroSchema, 
value, vectorPos);
+
+        if (!added) {
+          throw new IllegalStateException(String.format(
+              "Failed to add value %s to union with type %s",
+              value == null ? "null" : value.toString(),
+              type.toString()
+          ));
+        }
+
+        break;
+      default:
+        throw new IllegalArgumentException("Invalid TypeDescription " + 
type.toString() + ".");
+    }
+  }
+
+  /**
+   * Match value with its ORC type and add to the union vector at a given 
position.
+   *
+   * @param unionVector       The vector to store value.
+   * @param unionChildTypes   All possible types for the value Object.
+   * @param avroSchema        Avro union schema for the value Object.
+   * @param value             Object to be added to the unionVector
+   * @param vectorPos         The position in the vector where value will be 
stored at.
+   * @return                  succeeded or failed
+   */
+  public static boolean addUnionValue(
+      UnionColumnVector unionVector,
+      List<TypeDescription> unionChildTypes,
+      Schema avroSchema,
+      Object value,
+      int vectorPos
+  ) {
+    int matchIndex = -1;
+    TypeDescription matchType = null;
+    Object matchValue = null;
+
+    for (int t = 0; t < unionChildTypes.size(); t++) {
+      TypeDescription childType = unionChildTypes.get(t);
+      boolean matches = false;
+
+      switch (childType.getCategory()) {
+        case BOOLEAN:
+          matches = value instanceof Boolean;
+          break;
+        case BYTE:
+          matches = value instanceof Byte;
+          break;
+        case SHORT:
+          matches = value instanceof Short;
+          break;
+        case INT:
+          matches = value instanceof Integer;
+          break;
+        case LONG:
+          matches = value instanceof Long;
+          break;
+        case FLOAT:
+          matches = value instanceof Float;
+          break;
+        case DOUBLE:
+          matches = value instanceof Double;
+          break;
+        case STRING:
+        case VARCHAR:
+        case CHAR:
+          if (value instanceof String) {
+            matches = true;
+            matchValue = ((String) value).getBytes(StandardCharsets.UTF_8);
+          } else if (value instanceof Utf8) {
+            matches = true;
+            matchValue = ((Utf8) value).getBytes();
+          }
+          break;
+        case DATE:
+          matches = value instanceof Date;
+          break;
+        case TIMESTAMP:
+          matches = value instanceof Timestamp;
+          break;
+        case BINARY:
+          matches = value instanceof byte[] || value instanceof 
GenericData.Fixed;
+          break;
+        case DECIMAL:
+          matches = value instanceof BigDecimal;
+          break;
+        case LIST:
+          matches = value instanceof List;
+          break;
+        case MAP:
+          matches = value instanceof Map;
+          break;
+        case STRUCT:
+          throw new UnsupportedOperationException("Cannot handle STRUCT within 
UNION.");
+        case UNION:
+          List<TypeDescription> children = childType.getChildren();
+          if (value == null) {
+            matches = children == null || children.size() == 0;
+          } else {
+            matches = addUnionValue(unionVector, children, avroSchema, value, 
vectorPos);
+          }
+          break;
+        default:
+          throw new IllegalArgumentException("Invalid TypeDescription " + 
childType.getCategory().toString() + ".");
+      }
+
+      if (matches) {
+        matchIndex = t;
+        matchType = childType;
+        break;
+      }
+    }
+
+    if (value == null && matchValue != null) {
+      value = matchValue;
+    }
+
+    if (matchIndex >= 0) {
+      unionVector.tags[vectorPos] = matchIndex;
+      if (value == null) {
+        unionVector.isNull[vectorPos] = true;
+        unionVector.noNulls = false;
+      } else {
+        addToVector(matchType, unionVector.fields[matchIndex], 
avroSchema.getTypes().get(matchIndex), value, vectorPos);
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Read the Column vector at a given position conforming to a given ORC 
schema.
+   *
+   * @param type        ORC schema of the object to read.
+   * @param colVector   The column vector to read.
+   * @param avroSchema  Avro schema of the object to read.
+   *                    Only used to check logical types for timestamp unit 
conversion.
+   * @param vectorPos   The position in the vector where the value to read is 
stored at.
+   * @return            The object being read.
+   */
+  public static Object readFromVector(TypeDescription type, ColumnVector 
colVector, Schema avroSchema, int vectorPos) {
+
+    if (colVector.isRepeating) {
+      vectorPos = 0;
+    }
+
+    if (colVector.isNull[vectorPos]) {
+      return null;
+    }
+
+    if (avroSchema.getType().equals(Schema.Type.UNION)) {
+      avroSchema = getActualSchemaType(avroSchema);
+    }
+    LogicalType logicalType = avroSchema != null ? avroSchema.getLogicalType() 
: null;
+
+    switch (type.getCategory()) {
+      case BOOLEAN:
+        return ((LongColumnVector) colVector).vector[vectorPos] != 0;
+      case BYTE:
+        return (byte) ((LongColumnVector) colVector).vector[vectorPos];
+      case SHORT:
+        return (short) ((LongColumnVector) colVector).vector[vectorPos];
+      case INT:
+        return (int) ((LongColumnVector) colVector).vector[vectorPos];
+      case LONG:
+        return ((LongColumnVector) colVector).vector[vectorPos];
+      case FLOAT:
+        return (float) ((DoubleColumnVector) colVector).vector[vectorPos];
+      case DOUBLE:
+        return ((DoubleColumnVector) colVector).vector[vectorPos];
+      case VARCHAR:
+      case CHAR:
+        int maxLength = type.getMaxLength();
+        String result = ((BytesColumnVector) colVector).toString(vectorPos);
+        if (result.length() <= maxLength) {
+          return result;
+        } else {
+          throw new HoodieIOException("CHAR/VARCHAR has length " + 
result.length() + " greater than Max Length allowed");
+        }
+      case STRING:
+        String stringType = avroSchema.getProp(GenericData.STRING_PROP);
+        if (stringType == null || !stringType.equals(StringType.String)) {
+          int stringLength = ((BytesColumnVector) colVector).length[vectorPos];
+          int stringOffset = ((BytesColumnVector) colVector).start[vectorPos];
+          byte[] stringBytes = new byte[stringLength];
+          System.arraycopy(((BytesColumnVector) colVector).vector[vectorPos], 
stringOffset, stringBytes, 0, stringLength);
+          return new Utf8(stringBytes);
+        } else {
+          return ((BytesColumnVector) colVector).toString(vectorPos);
+        }
+      case DATE:
+        // convert to daysSinceEpoch for LogicalType.Date
+        return (int) ((LongColumnVector) colVector).vector[vectorPos];
+      case TIMESTAMP:
+        // The unit of time in ORC is millis. Convert (time,nanos) to the 
desired unit per logicalType
+        long time = ((TimestampColumnVector) colVector).time[vectorPos];
+        int nanos = ((TimestampColumnVector) colVector).nanos[vectorPos];
+        if (logicalType instanceof LogicalTypes.TimestampMillis) {
+          return time;
+        } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+          return time * MICROS_PER_MILLI + nanos / NANOS_PER_MICRO;
+        } else {
+          return ((TimestampColumnVector) 
colVector).getTimestampAsLong(vectorPos);
+        }
+      case BINARY:
+        int binaryLength = ((BytesColumnVector) colVector).length[vectorPos];
+        int binaryOffset = ((BytesColumnVector) colVector).start[vectorPos];
+        byte[] binaryBytes = new byte[binaryLength];
+        System.arraycopy(((BytesColumnVector) colVector).vector[vectorPos], 
binaryOffset, binaryBytes, 0, binaryLength);
+        // return a ByteBuffer to be consistent with AvroRecordConverter
+        return ByteBuffer.wrap(binaryBytes);
+      case DECIMAL:
+        // HiveDecimal always ignores trailing zeros, thus modifies the scale 
implicitly,
+        // therefore, the scale must be enforced here.
+        BigDecimal bigDecimal = ((DecimalColumnVector) 
colVector).vector[vectorPos]
+            .getHiveDecimal().bigDecimalValue()
+            .setScale(((LogicalTypes.Decimal) logicalType).getScale());
+        Schema.Type baseType = avroSchema.getType();
+        if (baseType.equals(Schema.Type.FIXED)) {
+          return new Conversions.DecimalConversion().toFixed(bigDecimal, 
avroSchema, logicalType);
+        } else if (baseType.equals(Schema.Type.BYTES)) {
+          return bigDecimal.unscaledValue().toByteArray();
+        } else {
+          throw new HoodieIOException(baseType.getName() + "is not a valid 
type for LogicalTypes.DECIMAL.");
+        }
+      case LIST:
+        ArrayList<Object> list = new ArrayList<>();
+        ListColumnVector listVector = (ListColumnVector) colVector;
+        int listLength = (int) listVector.lengths[vectorPos];
+        int listOffset = (int) listVector.offsets[vectorPos];
+        list.ensureCapacity(listLength);
+        TypeDescription childType = type.getChildren().get(0);
+        for (int i = 0; i < listLength; i++) {
+          list.add(readFromVector(childType, listVector.child, 
avroSchema.getElementType(), listOffset + i));
+        }
+        return list;
+      case MAP:
+        Map<String, Object> map = new HashMap<String, Object>();
+        MapColumnVector mapVector = (MapColumnVector) colVector;
+        int mapLength = (int) mapVector.lengths[vectorPos];
+        int mapOffset = (int) mapVector.offsets[vectorPos];
+        // keys are always strings for maps in Avro
+        Schema keySchema = Schema.create(Schema.Type.STRING);
+        for (int i = 0; i < mapLength; i++) {
+          map.put(
+              readFromVector(type.getChildren().get(0), mapVector.keys, 
keySchema, i + mapOffset).toString(),
+              readFromVector(type.getChildren().get(1), mapVector.values,
+                  avroSchema.getValueType(), i + mapOffset));
+        }
+        return map;
+      case STRUCT:
+        StructColumnVector structVector = (StructColumnVector) colVector;
+        List<TypeDescription> children = type.getChildren();
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        for (int i = 0; i < children.size(); i++) {
+          record.put(i, readFromVector(children.get(i), structVector.fields[i],
+              avroSchema.getFields().get(i).schema(), vectorPos));
+        }
+        return record;
+      case UNION:
+        UnionColumnVector unionVector = (UnionColumnVector) colVector;
+        int tag = unionVector.tags[vectorPos];
+        ColumnVector fieldVector = unionVector.fields[tag];
+        return readFromVector(type.getChildren().get(tag), fieldVector, 
avroSchema.getTypes().get(tag), vectorPos);
+      default:
+        throw new HoodieIOException("Unrecognized TypeDescription " + 
type.toString());
+    }
+  }
+
+  public static TypeDescription createOrcSchema(Schema avroSchema) {
+
+    LogicalType logicalType = avroSchema.getLogicalType();
+
+    if (logicalType != null) {
+      if (logicalType instanceof LogicalTypes.Decimal) {
+        return TypeDescription.createDecimal()
+            .withPrecision(((LogicalTypes.Decimal) logicalType).getPrecision())
+            .withScale(((LogicalTypes.Decimal) logicalType).getScale());
+      } else if (logicalType instanceof LogicalTypes.Date) {
+        // The date logical type represents a date within the calendar, with 
no reference to a particular time zone
+        // or time of day.
+        //
+        // A date logical type annotates an Avro int, where the int stores the 
number of days from the unix epoch, 1
+        // January 1970 (ISO calendar).
+        return TypeDescription.createDate();
+      } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+        // The time-millis logical type represents a time of day, with no 
reference to a particular calendar, time
+        // zone or date, with a precision of one millisecond.
+        //
+        // A time-millis logical type annotates an Avro int, where the int 
stores the number of milliseconds after
+        // midnight, 00:00:00.000.
+        return TypeDescription.createInt();
+      } else if (logicalType instanceof LogicalTypes.TimeMicros) {
+        // The time-micros logical type represents a time of day, with no 
reference to a particular calendar, time
+        // zone or date, with a precision of one microsecond.
+        //
+        // A time-micros logical type annotates an Avro long, where the long 
stores the number of microseconds after
+        // midnight, 00:00:00.000000.
+        return TypeDescription.createLong();
+      } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
+        // The timestamp-millis logical type represents an instant on the 
global timeline, independent of a
+        // particular time zone or calendar, with a precision of one 
millisecond.
+        //
+        // A timestamp-millis logical type annotates an Avro long, where the 
long stores the number of milliseconds
+        // from the unix epoch, 1 January 1970 00:00:00.000 UTC.
+        return TypeDescription.createTimestamp();
+      } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+        // The timestamp-micros logical type represents an instant on the 
global timeline, independent of a
+        // particular time zone or calendar, with a precision of one 
microsecond.
+        //
+        // A timestamp-micros logical type annotates an Avro long, where the 
long stores the number of microseconds
+        // from the unix epoch, 1 January 1970 00:00:00.000000 UTC.
+        return TypeDescription.createTimestamp();
+      }
+    }
+
+    final Schema.Type type = avroSchema.getType();
+    switch (type) {
+      case NULL:
+        // empty union represents null type
+        final TypeDescription nullUnion = TypeDescription.createUnion();
+        return nullUnion;
+      case LONG:
+        return TypeDescription.createLong();
+      case INT:
+        return TypeDescription.createInt();
+      case BYTES:
+        return TypeDescription.createBinary();
+      case ARRAY:
+        return 
TypeDescription.createList(createOrcSchema(avroSchema.getElementType()));
+      case RECORD:
+        final TypeDescription recordStruct = TypeDescription.createStruct();
+        for (Schema.Field field : avroSchema.getFields()) {
+          final Schema fieldSchema = field.schema();
+          final TypeDescription fieldType = createOrcSchema(fieldSchema);
+          if (fieldType != null) {
+            recordStruct.addField(field.name(), fieldType);
+          }
+        }
+        return recordStruct;
+      case MAP:
+        return TypeDescription.createMap(
+            // in Avro maps, keys are always strings
+            TypeDescription.createString(),
+            createOrcSchema(avroSchema.getValueType())
+        );
+      case UNION:
+        final List<Schema> nonNullMembers = 
avroSchema.getTypes().stream().filter(
+            schema -> !Schema.Type.NULL.equals(schema.getType())
+        ).collect(Collectors.toList());
+
+        if (nonNullMembers.isEmpty()) {
+          // no non-null union members; represent as an ORC empty union
+          return TypeDescription.createUnion();
+        } else if (nonNullMembers.size() == 1) {
+          // a single non-null union member
+          // this is how Avro represents "nullable" types; as a union of the 
NULL type with another
+          // since ORC already supports nullability of all types, just use the 
child type directly
+          return createOrcSchema(nonNullMembers.get(0));
+        } else {
+          // more than one non-null type; represent as an actual ORC union of 
them
+          final TypeDescription union = TypeDescription.createUnion();
+          for (final Schema childSchema : nonNullMembers) {
+            union.addUnionChild(createOrcSchema(childSchema));
+          }
+          return union;
+        }
+      case STRING:
+        return TypeDescription.createString();
+      case FLOAT:
+        return TypeDescription.createFloat();
+      case DOUBLE:
+        return TypeDescription.createDouble();
+      case BOOLEAN:
+        return TypeDescription.createBoolean();
+      case ENUM:
+        // represent as String for now
+        return TypeDescription.createString();
+      case FIXED:
+        return TypeDescription.createBinary();
+      default:
+        throw new IllegalStateException(String.format("Unrecognized Avro type: 
%s", type.getName()));
+    }
+  }
+
+  public static Schema createAvroSchema(TypeDescription orcSchema) {
+    switch (orcSchema.getCategory()) {
+      case BOOLEAN:
+        return Schema.create(Schema.Type.BOOLEAN);
+      case BYTE:
+        // tinyint (8 bit), use int to hold it
+        return Schema.create(Schema.Type.INT);
+      case SHORT:
+        // smallint (16 bit), use int to hold it
+        return Schema.create(Schema.Type.INT);
+      case INT:
+        // the Avro logical type could be 
AvroTypeUtil.LOGICAL_TYPE_TIME_MILLIS, but there is no way to distinguish
+        return Schema.create(Schema.Type.INT);
+      case LONG:
+        // the Avro logical type could be 
AvroTypeUtil.LOGICAL_TYPE_TIME_MICROS, but there is no way to distinguish
+        return Schema.create(Schema.Type.LONG);
+      case FLOAT:
+        return Schema.create(Schema.Type.FLOAT);
+      case DOUBLE:
+        return Schema.create(Schema.Type.DOUBLE);
+      case VARCHAR:
+      case CHAR:
+      case STRING:
+        return Schema.create(Schema.Type.STRING);
+      case DATE:
+        Schema date = Schema.create(Schema.Type.INT);
+        LogicalTypes.date().addToSchema(date);
+        return date;
+      case TIMESTAMP:
+        // Cannot distinguish between TIMESTAMP_MILLIS and TIMESTAMP_MICROS
+        // Assume TIMESTAMP_MILLIS because Timestamp in ORC is in millis
+        Schema timestamp = Schema.create(Schema.Type.LONG);
+        LogicalTypes.timestampMillis().addToSchema(timestamp);
+        return timestamp;
+      case BINARY:
+        return Schema.create(Schema.Type.BYTES);
+      case DECIMAL:
+        Schema decimal = Schema.create(Schema.Type.BYTES);
+        LogicalTypes.decimal(orcSchema.getPrecision(), 
orcSchema.getScale()).addToSchema(decimal);
+        return decimal;
+      case LIST:
+        return 
Schema.createArray(createAvroSchema(orcSchema.getChildren().get(0)));
+      case MAP:
+        return 
Schema.createMap(createAvroSchema(orcSchema.getChildren().get(1)));
+      case STRUCT:
+        List<Field> childFields = new ArrayList<>();
+        for (int i = 0; i < orcSchema.getChildren().size(); i++) {
+          TypeDescription childType = orcSchema.getChildren().get(i);
+          String childName = orcSchema.getFieldNames().get(i);
+          childFields.add(new Field(childName, createAvroSchema(childType), 
"", null));
+        }
+        return Schema.createRecord(childFields);
+      case UNION:
+        return Schema.createUnion(orcSchema.getChildren().stream()
+            .map(AvroOrcUtils::createAvroSchema)
+            .collect(Collectors.toList()));
+      default:
+        throw new IllegalStateException(String.format("Unrecognized ORC type: 
%s", orcSchema.getCategory().getName()));
+    }
+  }
+
+  /**
+   * Returns the actual schema of a field.
+   *
+   * All types in ORC is nullable whereas Avro uses a union that contains the 
NULL type to imply
+   * the nullability of an Avro type. To achieve consistency between the Avro 
and ORC schema,
+   * non-NULL types are extracted from the union type.
+   * @param unionSchema       A schema of union type.
+   * @return  An Avro schema that is either NULL or a UNION without NULL 
fields.
+   */
+  private static Schema getActualSchemaType(Schema unionSchema) {
+    final List<Schema> nonNullMembers = unionSchema.getTypes().stream().filter(
+        schema -> !Schema.Type.NULL.equals(schema.getType())
+    ).collect(Collectors.toList());
+    if (nonNullMembers.isEmpty()) {
+      return Schema.create(Schema.Type.NULL);
+    } else if (nonNullMembers.size() == 1) {
+      return nonNullMembers.get(0);
+    } else {
+      return Schema.createUnion(nonNullMembers);
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
index c52d700..9b95e16 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.util;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -25,16 +26,22 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.exception.HoodieException;
 
 public abstract class BaseFileUtils {
 
   public static BaseFileUtils getInstance(String path) {
     if (path.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
       return new ParquetUtils();
+    } else if (path.endsWith(HoodieFileFormat.ORC.getFileExtension())) {
+      return new OrcUtils();
     }
     throw new UnsupportedOperationException("The format for file " + path + " 
is not supported yet.");
   }
@@ -42,6 +49,8 @@ public abstract class BaseFileUtils {
   public static BaseFileUtils getInstance(HoodieFileFormat fileFormat) {
     if (HoodieFileFormat.PARQUET.equals(fileFormat)) {
       return new ParquetUtils();
+    } else if (HoodieFileFormat.ORC.equals(fileFormat)) {
+      return new OrcUtils();
     }
     throw new UnsupportedOperationException(fileFormat.name() + " format not 
supported yet.");
   }
@@ -50,24 +59,122 @@ public abstract class BaseFileUtils {
     return getInstance(metaClient.getTableConfig().getBaseFileFormat());
   }
 
-  public abstract Set<String> readRowKeys(Configuration configuration, Path 
filePath);
-
-  public abstract Set<String> filterRowKeys(Configuration configuration, Path 
filePath, Set<String> filter);
-
-  public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration 
configuration, Path filePath);
-
-  public abstract Schema readAvroSchema(Configuration configuration, Path 
filePath);
+  /**
+   * Read the rowKey list from the given data file.
+   * @param filePath      The data file path
+   * @param configuration configuration to build fs object
+   * @return Set Set of row keys
+   */
+  public Set<String> readRowKeys(Configuration configuration, Path filePath) {
+    return filterRowKeys(configuration, filePath, new HashSet<>());
+  }
 
-  public abstract BloomFilter readBloomFilterFromMetadata(Configuration 
configuration, Path filePath);
+  /**
+   * Read the bloom filter from the metadata of the given data file.
+   * @param configuration Configuration
+   * @param filePath The data file path
+   * @return a BloomFilter object
+   */
+  public BloomFilter readBloomFilterFromMetadata(Configuration configuration, 
Path filePath) {
+    Map<String, String> footerVals =
+        readFooter(configuration, false, filePath,
+            HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
+            HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
+            HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE);
+    String footerVal = 
footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
+    if (null == footerVal) {
+      // We use old style key "com.uber.hoodie.bloomfilter"
+      footerVal = 
footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
+    }
+    BloomFilter toReturn = null;
+    if (footerVal != null) {
+      if 
(footerVals.containsKey(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) {
+        toReturn = BloomFilterFactory.fromString(footerVal,
+            
footerVals.get(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE));
+      } else {
+        toReturn = BloomFilterFactory.fromString(footerVal, 
BloomFilterTypeCode.SIMPLE.name());
+      }
+    }
+    return toReturn;
+  }
 
-  public abstract String[] readMinMaxRecordKeys(Configuration configuration, 
Path filePath);
+  /**
+   * Read the min and max record key from the metadata of the given data file.
+   * @param configuration Configuration
+   * @param filePath The data file path
+   * @return A array of two string where the first is min record key and the 
second is max record key
+   */
+  public String[] readMinMaxRecordKeys(Configuration configuration, Path 
filePath) {
+    Map<String, String> minMaxKeys = readFooter(configuration, true, filePath,
+        HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, 
HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER);
+    if (minMaxKeys.size() != 2) {
+      throw new HoodieException(
+          String.format("Could not read min/max record key out of footer 
correctly from %s. read) : %s",
+              filePath, minMaxKeys));
+    }
+    return new String[] 
{minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER),
+        minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)};
+  }
 
+  /**
+   * Read the data file
+   * NOTE: This literally reads the entire file contents, thus should be used 
with caution.
+   * @param configuration Configuration
+   * @param filePath The data file path
+   * @return A list of GenericRecord
+   */
   public abstract List<GenericRecord> readAvroRecords(Configuration 
configuration, Path filePath);
 
+  /**
+   * Read the data file using the given schema
+   * NOTE: This literally reads the entire file contents, thus should be used 
with caution.
+   * @param configuration Configuration
+   * @param filePath The data file path
+   * @return A list of GenericRecord
+   */
   public abstract List<GenericRecord> readAvroRecords(Configuration 
configuration, Path filePath, Schema schema);
 
-  public abstract Map<String, String> readFooter(Configuration conf, boolean 
required, Path orcFilePath,
-      String... footerNames);
+  /**
+   * Read the footer data of the given data file.
+   * @param configuration Configuration
+   * @param required require the footer data to be in data file
+   * @param filePath The data file path
+   * @param footerNames The footer names to read
+   * @return A map where the key is the footer name and the value is the 
footer value
+   */
+  public abstract Map<String, String> readFooter(Configuration configuration, 
boolean required, Path filePath,
+                                                 String... footerNames);
+
+  /**
+   * Returns the number of records in the data file.
+   * @param configuration Configuration
+   * @param filePath The data file path
+   */
+  public abstract long getRowCount(Configuration configuration, Path filePath);
+
+  /**
+   * Read the rowKey list matching the given filter, from the given data file.
+   * If the filter is empty, then this will return all the row keys.
+   * @param filePath      The data file path
+   * @param configuration configuration to build fs object
+   * @param filter        record keys filter
+   * @return Set Set of row keys matching candidateRecordKeys
+   */
+  public abstract Set<String> filterRowKeys(Configuration configuration, Path 
filePath, Set<String> filter);
+
+  /**
+   * Fetch {@link HoodieKey}s from the given data file.
+   * @param configuration configuration to build fs object
+   * @param filePath      The data file path
+   * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+   */
+  public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration 
configuration, Path filePath);
 
-  public abstract long getRowCount(Configuration conf, Path filePath);
-}
\ No newline at end of file
+  /**
+   * Read the Avro schema of the data file.
+   * @param configuration Configuration
+   * @param filePath The data file path
+   * @return The Avro schema of the data file
+   */
+  public abstract Schema readAvroSchema(Configuration configuration, Path 
filePath);
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java
new file mode 100644
index 0000000..4b3caa7
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * This class wraps a ORC reader and provides an iterator based api to read 
from an ORC file.
+ */
+public class OrcReaderIterator<T> implements Iterator<T> {
+
+  private final RecordReader recordReader;
+  private final Schema avroSchema;
+  List<String> fieldNames;
+  List<TypeDescription> orcFieldTypes;
+  Schema[] avroFieldSchemas;
+  private VectorizedRowBatch batch;
+  private int rowInBatch;
+  private T next;
+
+  public OrcReaderIterator(RecordReader recordReader, Schema schema, 
TypeDescription orcSchema) {
+    this.recordReader = recordReader;
+    this.avroSchema = schema;
+    this.fieldNames = orcSchema.getFieldNames();
+    this.orcFieldTypes = orcSchema.getChildren();
+    this.avroFieldSchemas = fieldNames.stream()
+        .map(fieldName -> avroSchema.getField(fieldName).schema())
+        .toArray(size -> new Schema[size]);
+    this.batch = orcSchema.createRowBatch();
+    this.rowInBatch = 0;
+  }
+
+  /**
+   * If the current batch is empty, get a new one.
+   * @return true if we have rows available.
+   * @throws IOException
+   */
+  private boolean ensureBatch() throws IOException {
+    if (rowInBatch >= batch.size) {
+      rowInBatch = 0;
+      return recordReader.nextBatch(batch);
+    }
+    return true;
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      ensureBatch();
+      if (this.next == null) {
+        this.next = (T) readRecordFromBatch();
+      }
+      return this.next != null;
+    } catch (IOException io) {
+      throw new HoodieIOException("unable to read next record from ORC file ", 
io);
+    }
+  }
+
+  @Override
+  public T next() {
+    try {
+      // To handle case when next() is called before hasNext()
+      if (this.next == null) {
+        if (!hasNext()) {
+          throw new HoodieIOException("No more records left to read from ORC 
file");
+        }
+      }
+      T retVal = this.next;
+      this.next = (T) readRecordFromBatch();
+      return retVal;
+    } catch (IOException io) {
+      throw new HoodieIOException("unable to read next record from ORC file ", 
io);
+    }
+  }
+
+  private GenericData.Record readRecordFromBatch() throws IOException {
+    // No more records left to read from ORC file
+    if (!ensureBatch()) {
+      return null;
+    }
+
+    GenericData.Record record = new Record(avroSchema);
+    int numFields = orcFieldTypes.size();
+    for (int i = 0; i < numFields; i++) {
+      Object data = AvroOrcUtils.readFromVector(orcFieldTypes.get(i), 
batch.cols[i], avroFieldSchemas[i], rowInBatch);
+      record.put(fieldNames.get(i), data);
+    }
+    rowInBatch++;
+    return record;
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
new file mode 100644
index 0000000..9fc49a3
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.MetadataNotFoundException;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto.UserMetadataItem;
+import org.apache.orc.Reader;
+import org.apache.orc.Reader.Options;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+/**
+ * Utility functions for ORC files.
+ */
+public class OrcUtils extends BaseFileUtils {
+
+  /**
+   * Fetch {@link HoodieKey}s from the given ORC file.
+   *
+   * @param filePath      The ORC file path.
+   * @param configuration configuration to build fs object
+   * @return {@link List} of {@link HoodieKey}s fetched from the ORC file
+   */
+  @Override
+  public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration 
configuration, Path filePath) {
+    List<HoodieKey> hoodieKeys = new ArrayList<>();
+    try {
+      if (!filePath.getFileSystem(configuration).exists(filePath)) {
+        return new ArrayList<>();
+      }
+
+      Configuration conf = new Configuration(configuration);
+      conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
+      Reader reader = OrcFile.createReader(filePath, 
OrcFile.readerOptions(conf));
+
+      Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema();
+      TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readSchema);
+      List<String> fieldNames = orcSchema.getFieldNames();
+      VectorizedRowBatch batch = orcSchema.createRowBatch();
+      RecordReader recordReader = reader.rows(new 
Options(conf).schema(orcSchema));
+
+      // column indices for the RECORD_KEY_METADATA_FIELD, 
PARTITION_PATH_METADATA_FIELD fields
+      int keyCol = -1;
+      int partitionCol = -1;
+      for (int i = 0; i < fieldNames.size(); i++) {
+        if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
+          keyCol = i;
+        }
+        if 
(fieldNames.get(i).equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)) {
+          partitionCol = i;
+        }
+      }
+      if (keyCol == -1 || partitionCol == -1) {
+        throw new HoodieException(String.format("Couldn't find row keys or 
partition path in %s.", filePath));
+      }
+      while (recordReader.nextBatch(batch)) {
+        BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[keyCol];
+        BytesColumnVector partitionPaths = (BytesColumnVector) 
batch.cols[partitionCol];
+        for (int i = 0; i < batch.size; i++) {
+          String rowKey = rowKeys.toString(i);
+          String partitionPath = partitionPaths.toString(i);
+          hoodieKeys.add(new HoodieKey(rowKey, partitionPath));
+        }
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read from ORC file:" + filePath, 
e);
+    }
+    return hoodieKeys;
+  }
+
+  /**
+   * NOTE: This literally reads the entire file contents, thus should be used 
with caution.
+   */
+  @Override
+  public List<GenericRecord> readAvroRecords(Configuration configuration, Path 
filePath) {
+    Schema avroSchema;
+    try {
+      Reader reader = OrcFile.createReader(filePath, 
OrcFile.readerOptions(configuration));
+      avroSchema = AvroOrcUtils.createAvroSchema(reader.getSchema());
+    } catch (IOException io) {
+      throw new HoodieIOException("Unable to read Avro records from an ORC 
file:" + filePath, io);
+    }
+    return readAvroRecords(configuration, filePath, avroSchema);
+  }
+
+  /**
+   * NOTE: This literally reads the entire file contents, thus should be used 
with caution.
+   */
+  @Override
+  public List<GenericRecord> readAvroRecords(Configuration configuration, Path 
filePath, Schema avroSchema) {
+    List<GenericRecord> records = new ArrayList<>();
+    try {
+      Reader reader = OrcFile.createReader(filePath, 
OrcFile.readerOptions(configuration));
+      TypeDescription orcSchema = reader.getSchema();
+      RecordReader recordReader = reader.rows(new 
Options(configuration).schema(orcSchema));
+      OrcReaderIterator<GenericRecord> iterator = new 
OrcReaderIterator<>(recordReader, avroSchema, orcSchema);
+      while (iterator.hasNext()) {
+        GenericRecord record = iterator.next();
+        records.add(record);
+      }
+    } catch (IOException io) {
+      throw new HoodieIOException("Unable to create an ORC reader for ORC 
file:" + filePath, io);
+    }
+    return records;
+  }
+
+  /**
+   * Read the rowKey list matching the given filter, from the given ORC file. 
If the filter is empty, then this will
+   * return all the rowkeys.
+   *
+   * @param conf configuration to build fs object.
+   * @param filePath      The ORC file path.
+   * @param filter        record keys filter
+   * @return Set Set of row keys matching candidateRecordKeys
+   */
+  @Override
+  public Set<String> filterRowKeys(Configuration conf, Path filePath, 
Set<String> filter)
+      throws HoodieIOException {
+    try {
+      Reader reader = OrcFile.createReader(filePath, 
OrcFile.readerOptions(conf));
+      Set<String> filteredRowKeys = new HashSet<>();
+      TypeDescription schema = reader.getSchema();
+      List<String> fieldNames = schema.getFieldNames();
+      VectorizedRowBatch batch = schema.createRowBatch();
+      RecordReader recordReader = reader.rows(new 
Options(conf).schema(schema));
+
+      // column index for the RECORD_KEY_METADATA_FIELD field
+      int colIndex = -1;
+      for (int i = 0; i < fieldNames.size(); i++) {
+        if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) {
+          colIndex = i;
+          break;
+        }
+      }
+      if (colIndex == -1) {
+        throw new HoodieException(String.format("Couldn't find row keys in 
%s.", filePath));
+      }
+      while (recordReader.nextBatch(batch)) {
+        BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[colIndex];
+        for (int i = 0; i < batch.size; i++) {
+          String rowKey = rowKeys.toString(i);
+          if (filter.isEmpty() || filter.contains(rowKey)) {
+            filteredRowKeys.add(rowKey);
+          }
+        }
+      }
+      return filteredRowKeys;
+    } catch (IOException io) {
+      throw new HoodieIOException("Unable to read row keys for ORC file:" + 
filePath, io);
+    }
+  }
+
+  @Override
+  public Map<String, String> readFooter(Configuration conf, boolean required,
+                                        Path orcFilePath, String... 
footerNames) {
+    try {
+      Reader reader = OrcFile.createReader(orcFilePath, 
OrcFile.readerOptions(conf));
+      Map<String, String> footerVals = new HashMap<>();
+      List<UserMetadataItem> metadataItemList = 
reader.getFileTail().getFooter().getMetadataList();
+      Map<String, String> metadata = 
metadataItemList.stream().collect(Collectors.toMap(
+          UserMetadataItem::getName,
+          metadataItem -> metadataItem.getValue().toStringUtf8()));
+      for (String footerName : footerNames) {
+        if (metadata.containsKey(footerName)) {
+          footerVals.put(footerName, metadata.get(footerName));
+        } else if (required) {
+          throw new MetadataNotFoundException(
+              "Could not find index in ORC footer. Looked for key " + 
footerName + " in "
+                  + orcFilePath);
+        }
+      }
+      return footerVals;
+    } catch (IOException io) {
+      throw new HoodieIOException("Unable to read footer for ORC file:" + 
orcFilePath, io);
+    }
+  }
+
+  @Override
+  public Schema readAvroSchema(Configuration conf, Path orcFilePath) {
+    try {
+      Reader reader = OrcFile.createReader(orcFilePath, 
OrcFile.readerOptions(conf));
+      TypeDescription orcSchema = reader.getSchema();
+      return AvroOrcUtils.createAvroSchema(orcSchema);
+    } catch (IOException io) {
+      throw new HoodieIOException("Unable to get Avro schema for ORC file:" + 
orcFilePath, io);
+    }
+  }
+
+  @Override
+  public long getRowCount(Configuration conf, Path orcFilePath) {
+    try {
+      Reader reader = OrcFile.createReader(orcFilePath, 
OrcFile.readerOptions(conf));
+      return reader.getNumberOfRows();
+    } catch (IOException io) {
+      throw new HoodieIOException("Unable to get row count for ORC file:" + 
orcFilePath, io);
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index c7b3a3f..bd44724 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -19,14 +19,9 @@
 package org.apache.hudi.common.util;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.avro.HoodieAvroWriteSupport;
-import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.BloomFilterFactory;
-import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.MetadataNotFoundException;
 
@@ -58,18 +53,6 @@ import java.util.function.Function;
 public class ParquetUtils extends BaseFileUtils {
 
   /**
-   * Read the rowKey list from the given parquet file.
-   *
-   * @param filePath      The parquet file path.
-   * @param configuration configuration to build fs object
-   * @return Set Set of row keys
-   */
-  @Override
-  public Set<String> readRowKeys(Configuration configuration, Path filePath) {
-    return filterRowKeys(configuration, filePath, new HashSet<>());
-  }
-
-  /**
    * Read the rowKey list matching the given filter, from the given parquet 
file. If the filter is empty, then this will
    * return all the rowkeys.
    *
@@ -196,47 +179,8 @@ public class ParquetUtils extends BaseFileUtils {
 
   @Override
   public Schema readAvroSchema(Configuration configuration, Path 
parquetFilePath) {
-    return new 
AvroSchemaConverter(configuration).convert(readSchema(configuration, 
parquetFilePath));
-  }
-
-  /**
-   * Read out the bloom filter from the parquet file meta data.
-   */
-  @Override
-  public BloomFilter readBloomFilterFromMetadata(Configuration configuration, 
Path parquetFilePath) {
-    Map<String, String> footerVals =
-        readFooter(configuration, false, parquetFilePath,
-            HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
-            HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY,
-            HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE);
-    String footerVal = 
footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
-    if (null == footerVal) {
-      // We use old style key "com.uber.hoodie.bloomfilter"
-      footerVal = 
footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY);
-    }
-    BloomFilter toReturn = null;
-    if (footerVal != null) {
-      if 
(footerVals.containsKey(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) {
-        toReturn = BloomFilterFactory.fromString(footerVal,
-            
footerVals.get(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE));
-      } else {
-        toReturn = BloomFilterFactory.fromString(footerVal, 
BloomFilterTypeCode.SIMPLE.name());
-      }
-    }
-    return toReturn;
-  }
-
-  @Override
-  public String[] readMinMaxRecordKeys(Configuration configuration, Path 
parquetFilePath) {
-    Map<String, String> minMaxKeys = readFooter(configuration, true, 
parquetFilePath,
-        HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, 
HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER);
-    if (minMaxKeys.size() != 2) {
-      throw new HoodieException(
-          String.format("Could not read min/max record key out of footer 
correctly from %s. read) : %s",
-              parquetFilePath, minMaxKeys));
-    }
-    return new String[] 
{minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER),
-        minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)};
+    MessageType parquetSchema = readSchema(configuration, parquetFilePath);
+    return new AvroSchemaConverter(configuration).convert(parquetSchema);
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index ff559c5..f913df7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 
 import java.io.IOException;
 
+import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
 
@@ -40,6 +41,9 @@ public class HoodieFileReaderFactory {
     if (HFILE.getFileExtension().equals(extension)) {
       return newHFileFileReader(conf, path);
     }
+    if (ORC.getFileExtension().equals(extension)) {
+      return newOrcFileReader(conf, path);
+    }
 
     throw new UnsupportedOperationException(extension + " format not supported 
yet.");
   }
@@ -52,4 +56,8 @@ public class HoodieFileReaderFactory {
     CacheConfig cacheConfig = new CacheConfig(conf);
     return new HoodieHFileReader<>(conf, path, cacheConfig);
   }
+
+  private static <R extends IndexedRecord> HoodieFileReader<R> 
newOrcFileReader(Configuration conf, Path path) {
+    return new HoodieOrcReader<>(conf, path);
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java
new file mode 100644
index 0000000..319f8d7
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.util.AvroOrcUtils;
+import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.OrcReaderIterator;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.Reader.Options;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+public class HoodieOrcReader<R extends IndexedRecord> implements 
HoodieFileReader {
+  private Path path;
+  private Configuration conf;
+  private final BaseFileUtils orcUtils;
+
+  public HoodieOrcReader(Configuration configuration, Path path) {
+    this.conf = configuration;
+    this.path = path;
+    this.orcUtils = BaseFileUtils.getInstance(HoodieFileFormat.ORC);
+  }
+
+  @Override
+  public String[] readMinMaxRecordKeys() {
+    return orcUtils.readMinMaxRecordKeys(conf, path);
+  }
+
+  @Override
+  public BloomFilter readBloomFilter() {
+    return orcUtils.readBloomFilterFromMetadata(conf, path);
+  }
+
+  @Override
+  public Set<String> filterRowKeys(Set candidateRowKeys) {
+    return orcUtils.filterRowKeys(conf, path, candidateRowKeys);
+  }
+
+  @Override
+  public Iterator<R> getRecordIterator(Schema schema) throws IOException {
+    try {
+      Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+      TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(schema);
+      RecordReader recordReader = reader.rows(new 
Options(conf).schema(orcSchema));
+      return new OrcReaderIterator(recordReader, schema, orcSchema);
+    } catch (IOException io) {
+      throw new HoodieIOException("Unable to create an ORC reader.", io);
+    }
+  }
+
+  @Override
+  public Schema getSchema() {
+    return orcUtils.readAvroSchema(conf, path);
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public long getTotalRecords() {
+    return orcUtils.getRowCount(conf, path);
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroOrcUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroOrcUtils.java
new file mode 100644
index 0000000..b775a37
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroOrcUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.orc.TypeDescription;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestAvroOrcUtils extends HoodieCommonTestHarness {
+
+  public static List<Arguments> testCreateOrcSchemaArgs() {
+    // the ORC schema is constructed in the order as AVRO_SCHEMA:
+    // TRIP_SCHEMA_PREFIX, EXTRA_TYPE_SCHEMA, MAP_TYPE_SCHEMA, 
FARE_NESTED_SCHEMA, TIP_NESTED_SCHEMA, TRIP_SCHEMA_SUFFIX
+    // The following types are tested:
+    // DATE, DECIMAL, LONG, INT, BYTES, ARRAY, RECORD, MAP, STRING, FLOAT, 
DOUBLE
+    TypeDescription orcSchema = TypeDescription.fromString("struct<"
+        + 
"timestamp:bigint,_row_key:string,rider:string,driver:string,begin_lat:double,"
+        + "begin_lon:double,end_lat:double,end_lon:double,"
+        + 
"distance_in_meters:int,seconds_since_epoch:bigint,weight:float,nation:binary,"
+        + "current_date:date,current_ts:bigint,height:decimal(10,6),"
+        + "city_to_state:map<string,string>,"
+        + "fare:struct<amount:double,currency:string>,"
+        + "tip_history:array<struct<amount:double,currency:string>>,"
+        + "_hoodie_is_deleted:boolean>");
+
+    // Tests the types FIXED, UNION
+    String structField = "{\"type\":\"record\", \"name\":\"fare\",\"fields\": "
+        + "[{\"name\": \"amount\",\"type\": \"double\"},{\"name\": 
\"currency\", \"type\": \"string\"}]}";
+    Schema avroSchemaWithMoreTypes = new Schema.Parser().parse(
+        "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ "
+            + "{\"name\" : \"age\", \"type\":{\"type\": \"fixed\", \"size\": 
16, \"name\": \"fixedField\" }},"
+            + "{\"name\" : \"height\", \"type\": [\"int\", \"null\"] },"
+            + "{\"name\" : \"id\", \"type\": [\"int\", \"string\"] },"
+            + "{\"name\" : \"fare\", \"type\": [" + structField + ", \"null\"] 
}]}");
+    TypeDescription orcSchemaWithMoreTypes = TypeDescription.fromString(
+        
"struct<age:binary,height:int,id:uniontype<int,string>,fare:struct<amount:double,currency:string>>");
+
+    return Arrays.asList(
+        Arguments.of(AVRO_SCHEMA, orcSchema),
+        Arguments.of(avroSchemaWithMoreTypes, orcSchemaWithMoreTypes)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("testCreateOrcSchemaArgs")
+  public void testCreateOrcSchema(Schema avroSchema, TypeDescription 
orcSchema) {
+    TypeDescription convertedSchema = AvroOrcUtils.createOrcSchema(avroSchema);
+    assertEquals(orcSchema, convertedSchema);
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestOrcReaderIterator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestOrcReaderIterator.java
new file mode 100644
index 0000000..b55995c
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestOrcReaderIterator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+
+import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestOrcReaderIterator {
+  private final Path filePath = new Path(System.getProperty("java.io.tmpdir") 
+ "/f1_1-0-1_000.orc");
+
+  @BeforeEach
+  @AfterEach
+  public void clearTempFile() {
+    File file = new File(filePath.toString());
+    if (file.exists()) {
+      file.delete();
+    }
+  }
+
+  @Test
+  public void testOrcIteratorReadData() throws Exception {
+    final Configuration conf = new Configuration();
+    Schema avroSchema = getSchemaFromResource(TestOrcReaderIterator.class, 
"/simple-test.avsc");
+    TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(avroSchema);
+    OrcFile.WriterOptions options = 
OrcFile.writerOptions(conf).setSchema(orcSchema).compress(CompressionKind.ZLIB);
+    Writer writer = OrcFile.createWriter(filePath, options);
+    VectorizedRowBatch batch = orcSchema.createRowBatch();
+    BytesColumnVector nameColumns = (BytesColumnVector) batch.cols[0];
+    LongColumnVector numberColumns = (LongColumnVector) batch.cols[1];
+    BytesColumnVector colorColumns = (BytesColumnVector) batch.cols[2];
+    for (int r = 0; r < 5; ++r) {
+      int row = batch.size++;
+      byte[] name = ("name" + r).getBytes(StandardCharsets.UTF_8);
+      nameColumns.setVal(row, name);
+      byte[] color = ("color" + r).getBytes(StandardCharsets.UTF_8);
+      colorColumns.setVal(row, color);
+      numberColumns.vector[row] = r;
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Reader reader = OrcFile.createReader(filePath, 
OrcFile.readerOptions(conf));
+    RecordReader recordReader = reader.rows(new 
Reader.Options(conf).schema(orcSchema));
+    Iterator<GenericRecord> iterator = new OrcReaderIterator<>(recordReader, 
avroSchema, orcSchema);
+    int recordCount = 0;
+    while (iterator.hasNext()) {
+      GenericRecord record = iterator.next();
+      assertEquals("name" + recordCount, record.get("name").toString());
+      assertEquals("color" + recordCount, 
record.get("favorite_color").toString());
+      assertEquals(recordCount, record.get("favorite_number"));
+      recordCount++;
+    }
+    assertEquals(5, recordCount);
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java
 
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java
index 13971d5..ec334bd 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java
@@ -44,11 +44,16 @@ public class TestHoodieFileReaderFactory {
     HoodieFileReader<IndexedRecord> parquetReader = 
HoodieFileReaderFactory.getFileReader(hadoopConf, parquetPath);
     assertTrue(parquetReader instanceof HoodieParquetReader);
 
-    // other file format exception.
+    // log file format.
     final Path logPath = new 
Path("/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
     final Throwable thrown = assertThrows(UnsupportedOperationException.class, 
() -> {
       HoodieFileReader<IndexedRecord> logWriter = 
HoodieFileReaderFactory.getFileReader(hadoopConf, logPath);
     }, "should fail since log storage reader is not supported yet.");
     assertTrue(thrown.getMessage().contains("format not supported yet."));
+
+    // Orc file format.
+    final Path orcPath = new Path("/partition/path/f1_1-0-1_000.orc");
+    HoodieFileReader<IndexedRecord> orcReader = 
HoodieFileReaderFactory.getFileReader(hadoopConf, orcPath);
+    assertTrue(orcReader instanceof HoodieOrcReader);
   }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index e49d012..b39ee34 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -18,6 +18,9 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
@@ -123,6 +126,8 @@ public class HoodieInputFormatUtils {
         } else {
           return HoodieHFileInputFormat.class.getName();
         }
+      case ORC:
+        return OrcInputFormat.class.getName();
       default:
         throw new HoodieIOException("Hoodie InputFormat not implemented for 
base file format " + baseFileFormat);
     }
@@ -134,6 +139,8 @@ public class HoodieInputFormatUtils {
         return MapredParquetOutputFormat.class.getName();
       case HFILE:
         return MapredParquetOutputFormat.class.getName();
+      case ORC:
+        return OrcOutputFormat.class.getName();
       default:
         throw new HoodieIOException("No OutputFormat for base file format " + 
baseFileFormat);
     }
@@ -145,6 +152,8 @@ public class HoodieInputFormatUtils {
         return ParquetHiveSerDe.class.getName();
       case HFILE:
         return ParquetHiveSerDe.class.getName();
+      case ORC:
+        return OrcSerde.class.getName();
       default:
         throw new HoodieIOException("No SerDe for base file format " + 
baseFileFormat);
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index 0b8234d..32bd9a4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
 
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
 import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
OPERATION_OPT_KEY}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
@@ -28,6 +28,7 @@ import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.HoodieROTablePathFilter
 import org.apache.log4j.LogManager
 import org.apache.spark.sql.execution.datasources.{DataSource, 
FileStatusCache, HadoopFsRelation}
+import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
@@ -186,6 +187,10 @@ class DefaultSource extends RelationProvider
                                   extraReadPaths: Seq[String],
                                   metaClient: HoodieTableMetaClient): 
BaseRelation = {
     log.info("Loading Base File Only View  with options :" + optParams)
+    val (tableFileFormat, formatClassName) = 
metaClient.getTableConfig.getBaseFileFormat match {
+      case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet")
+      case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
+    }
 
     if (useHoodieFileIndex) {
 
@@ -198,7 +203,7 @@ class DefaultSource extends RelationProvider
         fileIndex.partitionSchema,
         fileIndex.dataSchema,
         bucketSpec = None,
-        fileFormat = new ParquetFileFormat,
+        fileFormat = tableFileFormat,
         optParams)(sqlContext.sparkSession)
     } else {
       // this is just effectively RO view only, where `path` can contain a mix 
of
@@ -208,12 +213,12 @@ class DefaultSource extends RelationProvider
         classOf[HoodieROTablePathFilter],
         classOf[org.apache.hadoop.fs.PathFilter])
 
-      // simply return as a regular parquet relation
+      // simply return as a regular relation
       DataSource.apply(
         sparkSession = sqlContext.sparkSession,
         paths = extraReadPaths,
         userSpecifiedSchema = Option(schema),
-        className = "parquet",
+        className = formatClassName,
         options = optParams)
         .resolveRelation()
     }
diff --git a/pom.xml b/pom.xml
index a8054c2..c1b4a99 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,8 @@
     <hive.version>2.3.1</hive.version>
     <hive.exec.classifier>core</hive.exec.classifier>
     <metrics.version>4.1.1</metrics.version>
+    <orc.version>1.6.0</orc.version>
+    <airlift.version>0.16</airlift.version>
     <prometheus.version>0.8.0</prometheus.version>
     <http.version>4.4.1</http.version>
     <spark.version>${spark2.version}</spark.version>

Reply via email to