This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 86e8903  Support orc format for native batch ingestion (#8950)
86e8903 is described below

commit 86e8903523fc3d2d177963d2f95fd56441605102
Author: Jihoon Son <[email protected]>
AuthorDate: Thu Nov 28 12:45:24 2019 -0800

    Support orc format for native batch ingestion (#8950)
    
    * Support orc format for native batch ingestion
    
    * fix pom and remove wrong comment
    
    * fix unnecessary condition check
    
    * use flatMap back to handle exception properly
    
    * move exceptionThrowingIterator to intermediateRowParsingReader
    
    * runtime
---
 .../data/input/IntermediateRowParsingReader.java   | 104 ++++++--
 .../util/common/parsers/CloseableIterator.java     |  45 +---
 extensions-core/orc-extensions/pom.xml             | 259 +++++++++++++++++++-
 .../druid/data/input/orc/OrcExtensionsModule.java  |  49 +++-
 .../druid/data/input/orc/OrcInputFormat.java       |  87 +++++++
 .../org/apache/druid/data/input/orc/OrcReader.java | 162 +++++++++++++
 .../input/orc/OrcHadoopInputRowParserTest.java     |  90 ++++---
 .../apache/druid/data/input/orc/OrcReaderTest.java | 266 +++++++++++++++++++++
 extensions-core/parquet-extensions/pom.xml         |  34 +--
 .../indexing/common/task/IngestionTestBase.java    |   3 +-
 10 files changed, 975 insertions(+), 124 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
 
b/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
index aae448b..e0506c3 100644
--- 
a/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
+++ 
b/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
@@ -19,13 +19,14 @@
 
 package org.apache.druid.data.input;
 
-import org.apache.druid.java.util.common.CloseableIterators;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.ParseException;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
 /**
  * {@link InputEntityReader} that parses bytes into some intermediate rows 
first, and then into {@link InputRow}s.
@@ -39,25 +40,60 @@ public abstract class IntermediateRowParsingReader<T> 
implements InputEntityRead
   @Override
   public CloseableIterator<InputRow> read() throws IOException
   {
-    return intermediateRowIterator().flatMap(row -> {
-      try {
-        // since parseInputRows() returns a list, the below line always 
iterates over the list,
-        // which means it calls Iterator.hasNext() and Iterator.next() at 
least once per row.
-        // This could be unnecessary if the row wouldn't be exploded into 
multiple inputRows.
-        // If this line turned out to be a performance bottleneck, perhaps 
parseInputRows() interface might not be a
-        // good idea. Subclasses could implement read() with some duplicate 
codes to avoid unnecessary iteration on
-        // a singleton list.
-        return 
CloseableIterators.withEmptyBaggage(parseInputRows(row).iterator());
+    final CloseableIterator<T> intermediateRowIterator = 
intermediateRowIterator();
+
+    return new CloseableIterator<InputRow>()
+    {
+      // since parseInputRows() returns a list, the below line always iterates 
over the list,
+      // which means it calls Iterator.hasNext() and Iterator.next() at least 
once per row.
+      // This could be unnecessary if the row wouldn't be exploded into 
multiple inputRows.
+      // If this line turned out to be a performance bottleneck, perhaps 
parseInputRows() interface might not be a
+      // good idea. Subclasses could implement read() with some duplicate 
codes to avoid unnecessary iteration on
+      // a singleton list.
+      Iterator<InputRow> rows = null;
+
+      @Override
+      public boolean hasNext()
+      {
+        if (rows == null || !rows.hasNext()) {
+          if (!intermediateRowIterator.hasNext()) {
+            return false;
+          }
+          final T row = intermediateRowIterator.next();
+          try {
+            rows = parseInputRows(row).iterator();
+          }
+          catch (IOException e) {
+            rows = new ExceptionThrowingIterator(new ParseException(e, "Unable 
to parse row [%s]", row));
+          }
+          catch (ParseException e) {
+            rows = new ExceptionThrowingIterator(e);
+          }
+        }
+
+        return true;
       }
-      catch (IOException e) {
-        throw new ParseException(e, "Unable to parse row [%s]", row);
+
+      @Override
+      public InputRow next()
+      {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+
+        return rows.next();
       }
-    });
+
+      @Override
+      public void close() throws IOException
+      {
+        intermediateRowIterator.close();
+      }
+    };
   }
 
   @Override
-  public CloseableIterator<InputRowListPlusRawValues> sample()
-      throws IOException
+  public CloseableIterator<InputRowListPlusRawValues> sample() throws 
IOException
   {
     return intermediateRowIterator().map(row -> {
       final Map<String, Object> rawColumns;
@@ -87,6 +123,9 @@ public abstract class IntermediateRowParsingReader<T> 
implements InputEntityRead
 
   /**
    * Parses the given intermediate row into a list of {@link InputRow}s.
+   * This should return a non-empty list.
+   *
+   * @throws ParseException if it cannot parse the given intermediateRow 
properly
    */
   protected abstract List<InputRow> parseInputRows(T intermediateRow) throws 
IOException, ParseException;
 
@@ -95,4 +134,39 @@ public abstract class IntermediateRowParsingReader<T> 
implements InputEntityRead
    * Implementations can use any method to convert the given row into a Map.
    */
   protected abstract Map<String, Object> toMap(T intermediateRow) throws 
IOException;
+
+  private static class ExceptionThrowingIterator implements 
CloseableIterator<InputRow>
+  {
+    private final Exception exception;
+
+    private boolean thrown = false;
+
+    private ExceptionThrowingIterator(Exception exception)
+    {
+      this.exception = exception;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return !thrown;
+    }
+
+    @Override
+    public InputRow next()
+    {
+      thrown = true;
+      if (exception instanceof RuntimeException) {
+        throw (RuntimeException) exception;
+      } else {
+        throw new RuntimeException(exception);
+      }
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+      // do nothing
+    }
+  }
 }
diff --git 
a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
index 3753b3f..45cda5c 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
@@ -80,14 +80,8 @@ public interface CloseableIterator<T> extends Iterator<T>, 
Closeable
               throw new UncheckedIOException(e);
             }
           }
-          try {
-            iterator = function.apply(delegate.next());
-            if (iterator.hasNext()) {
-              return iterator;
-            }
-          }
-          catch (Exception e) {
-            iterator = new ExceptionThrowingIterator<>(e);
+          iterator = function.apply(delegate.next());
+          if (iterator.hasNext()) {
             return iterator;
           }
         }
@@ -121,39 +115,4 @@ public interface CloseableIterator<T> extends Iterator<T>, 
Closeable
       }
     };
   }
-
-  class ExceptionThrowingIterator<T> implements CloseableIterator<T>
-  {
-    private final Exception exception;
-
-    private boolean thrown = false;
-
-    private ExceptionThrowingIterator(Exception exception)
-    {
-      this.exception = exception;
-    }
-
-    @Override
-    public boolean hasNext()
-    {
-      return !thrown;
-    }
-
-    @Override
-    public T next()
-    {
-      thrown = true;
-      if (exception instanceof RuntimeException) {
-        throw (RuntimeException) exception;
-      } else {
-        throw new RuntimeException(exception);
-      }
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-      // do nothing
-    }
-  }
 }
diff --git a/extensions-core/orc-extensions/pom.xml 
b/extensions-core/orc-extensions/pom.xml
index c739bed..177e403 100644
--- a/extensions-core/orc-extensions/pom.xml
+++ b/extensions-core/orc-extensions/pom.xml
@@ -49,12 +49,6 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-client</artifactId>
-            <version>${hadoop.compile.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
             <groupId>org.apache.orc</groupId>
             <artifactId>orc-mapreduce</artifactId>
             <version>${orc.version}</version>
@@ -178,12 +172,253 @@
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-core</artifactId>
-            <scope>provided</scope>
+            <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>aopalliance</groupId>
+                    <artifactId>aopalliance</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.inject</groupId>
+                    <artifactId>guice</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.inject.extensions</groupId>
+                    <artifactId>guice-servlet</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.inject</groupId>
+                    <artifactId>javax</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-client</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+        <!--
+        for native batch indexing with Orc files, we require a small number of 
classes provided by hadoop-common and
+        hadoop-mapreduce-client-core. However, both of these jars have a very 
large set of dependencies, the majority of
+        which we do not need (and are provided by Hadoop in that environment). 
hadoop-common is the biggest offender,
+        with things like zookeeper, jetty, just .. so much stuff. These 
exclusions remove ~60 jars from being unnecessarily
+        bundled with this extension. There might be some alternative 
arrangement to get what we need, worth looking into if
+        anyone is feeling adventurous.
+        -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
-            <scope>provided</scope>
+            <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.yetus</groupId>
+                    <artifactId>audience-annotations</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.directory.server</groupId>
+                    <artifactId>apacheds-kerberos-codec</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-beanutils</groupId>
+                    <artifactId>commons-beanutils-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-cli</groupId>
+                    <artifactId>commons-cli</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-codec</groupId>
+                    <artifactId>commons-codec</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-io</groupId>
+                    <artifactId>commons-io</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-lang</groupId>
+                    <artifactId>commons-lang</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-collections</groupId>
+                    <artifactId>commons-collections</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-math3</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-net</groupId>
+                    <artifactId>commons-net</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.curator</groupId>
+                    <artifactId>curator-client</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.curator</groupId>
+                    <artifactId>curator-recipes</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.curator</groupId>
+                    <artifactId>curator-framework</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.code.gson</groupId>
+                    <artifactId>gson</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpclient</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpcore</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-mapper-asl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-server</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey</groupId>
+                    <artifactId>jersey-json</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty-util</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty-sslengine</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>jetty</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.java.dev.jets3t</groupId>
+                    <artifactId>jets3t</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-core-asl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.code.findbugs</groupId>
+                    <artifactId>jsr305</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.ws.rs</groupId>
+                    <artifactId>jsr311-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.servlet.jsp</groupId>
+                    <artifactId>jsp-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.jcraft</groupId>
+                    <artifactId>jsch</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>xmlenc</groupId>
+                    <artifactId>xmlenc</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.nimbusds</groupId>
+                    <artifactId>nimbus-jose-jwt</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>com.google.inject</groupId>
@@ -213,6 +448,7 @@
         <dependency>
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-storage-api</artifactId>
+            <scope>compile</scope>
             <exclusions>
                 <exclusion>
                     <groupId>commons-lang</groupId>
@@ -229,5 +465,12 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-core</artifactId>
+            <version>${project.parent.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java
 
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java
index 138f85e..e57995b 100644
--- 
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java
+++ 
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java
@@ -23,13 +23,26 @@ import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.inject.Binder;
+import com.google.inject.Inject;
 import org.apache.druid.initialization.DruidModule;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 
 public class OrcExtensionsModule implements DruidModule
 {
+  private Properties props = null;
+
+  @Inject
+  public void setProperties(Properties props)
+  {
+    this.props = props;
+  }
+
   @Override
   public List<? extends Module> getJacksonModules()
   {
@@ -37,13 +50,45 @@ public class OrcExtensionsModule implements DruidModule
         new SimpleModule("OrcInputRowParserModule")
             .registerSubtypes(
                 new NamedType(OrcHadoopInputRowParser.class, "orc"),
-                new NamedType(OrcParseSpec.class, "orc")
-            )
+                new NamedType(OrcParseSpec.class, "orc"),
+                new NamedType(OrcInputFormat.class, "orc")
+      )
     );
   }
 
   @Override
   public void configure(Binder binder)
   {
+    // this block of code is common among extensions that use Hadoop things 
but are not running in Hadoop, in order
+    // to properly initialize everything
+
+    final Configuration conf = new Configuration();
+
+    // Set explicit CL. Otherwise it'll try to use thread context CL, which 
may not have all of our dependencies.
+    conf.setClassLoader(getClass().getClassLoader());
+
+    // Ensure that FileSystem class level initialization happens with correct 
CL
+    // See https://github.com/apache/incubator-druid/issues/1714
+    ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+    try {
+      
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+      FileSystem.get(conf);
+    }
+    catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+    finally {
+      Thread.currentThread().setContextClassLoader(currCtxCl);
+    }
+
+    if (props != null) {
+      for (String propName : props.stringPropertyNames()) {
+        if (propName.startsWith("hadoop.")) {
+          conf.set(propName.substring("hadoop.".length()), 
props.getProperty(propName));
+        }
+      }
+    }
+
+    binder.bind(Configuration.class).toInstance(conf);
   }
 }
diff --git 
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java
 
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java
new file mode 100644
index 0000000..bf60fb9
--- /dev/null
+++ 
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.orc;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.NestedInputFormat;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Objects;
+
+public class OrcInputFormat extends NestedInputFormat
+{
+  private final boolean binaryAsString;
+  private final Configuration conf;
+
+  @JsonCreator
+  public OrcInputFormat(
+      @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
+      @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString,
+      @JacksonInject Configuration conf
+  )
+  {
+    super(flattenSpec);
+    this.binaryAsString = binaryAsString == null ? false : binaryAsString;
+    this.conf = conf;
+  }
+
+  @Override
+  public boolean isSplittable()
+  {
+    return false;
+  }
+
+  @Override
+  public InputEntityReader createReader(InputRowSchema inputRowSchema, 
InputEntity source, File temporaryDirectory)
+  {
+    return new OrcReader(conf, inputRowSchema, source, temporaryDirectory, 
getFlattenSpec(), binaryAsString);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    OrcInputFormat that = (OrcInputFormat) o;
+    return binaryAsString == that.binaryAsString &&
+           Objects.equals(conf, that.conf);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), binaryAsString, conf);
+  }
+}
diff --git 
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
 
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
new file mode 100644
index 0000000..e9b6817
--- /dev/null
+++ 
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.orc;
+
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntity.CleanableFile;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ObjectFlattener;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcMapredRecordReader;
+import org.apache.orc.mapred.OrcStruct;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+public class OrcReader extends IntermediateRowParsingReader<OrcStruct>
+{
+  private final Configuration conf;
+  private final InputRowSchema inputRowSchema;
+  private final InputEntity source;
+  private final File temporaryDirectory;
+  private final ObjectFlattener<OrcStruct> orcStructFlattener;
+
+  OrcReader(
+      Configuration conf,
+      InputRowSchema inputRowSchema,
+      InputEntity source,
+      File temporaryDirectory,
+      JSONPathSpec flattenSpec,
+      boolean binaryAsString
+  )
+  {
+    this.conf = conf;
+    this.inputRowSchema = inputRowSchema;
+    this.source = source;
+    this.temporaryDirectory = temporaryDirectory;
+    this.orcStructFlattener = ObjectFlatteners.create(flattenSpec, new 
OrcStructFlattenerMaker(binaryAsString));
+  }
+
+  @Override
+  protected CloseableIterator<OrcStruct> intermediateRowIterator() throws 
IOException
+  {
+    final Closer closer = Closer.create();
+
+    // We fetch here to cache a copy locally. However, this might need to be 
changed if we want to split an orc file
+    // into several InputSplits in the future.
+    final byte[] buffer = new byte[InputEntity.DEFAULT_FETCH_BUFFER_SIZE];
+    final CleanableFile file = 
closer.register(source.fetch(temporaryDirectory, buffer));
+    final Path path = new Path(file.file().toURI());
+
+    final ClassLoader currentClassLoader = 
Thread.currentThread().getContextClassLoader();
+    final Reader reader;
+    try {
+      
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+      reader = closer.register(OrcFile.createReader(path, 
OrcFile.readerOptions(conf)));
+    }
+    finally {
+      Thread.currentThread().setContextClassLoader(currentClassLoader);
+    }
+    // The below line will get the schmea to read the whole columns.
+    // This can be improved by projecting some columns only what users want in 
the future.
+    final TypeDescription schema = reader.getSchema();
+    final RecordReader batchReader = reader.rows(reader.options());
+    final OrcMapredRecordReader<OrcStruct> recordReader = new 
OrcMapredRecordReader<>(batchReader, schema);
+    closer.register(recordReader::close);
+    return new CloseableIterator<OrcStruct>()
+    {
+      final NullWritable key = recordReader.createKey();
+      OrcStruct value = null;
+
+      @Override
+      public boolean hasNext()
+      {
+        if (value == null) {
+          try {
+            // The returned OrcStruct in next() can be kept in memory for a 
while.
+            // Here, we create a new instance of OrcStruct before calling 
RecordReader.next(),
+            // so that we can avoid to share the same reference to the "value" 
across rows.
+            value = recordReader.createValue();
+            if (!recordReader.next(key, value)) {
+              value = null;
+            }
+          }
+          catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+        return value != null;
+      }
+
+      @Override
+      public OrcStruct next()
+      {
+        if (value == null) {
+          throw new NoSuchElementException();
+        }
+        final OrcStruct currentValue = value;
+        value = null;
+        return currentValue;
+      }
+
+      @Override
+      public void close() throws IOException
+      {
+        closer.close();
+      }
+    };
+  }
+
+  @Override
+  protected List<InputRow> parseInputRows(OrcStruct intermediateRow) throws 
ParseException
+  {
+    return Collections.singletonList(
+        MapInputRowParser.parse(
+            inputRowSchema.getTimestampSpec(),
+            inputRowSchema.getDimensionsSpec(),
+            orcStructFlattener.flatten(intermediateRow)
+        )
+    );
+  }
+
+  @Override
+  protected Map<String, Object> toMap(OrcStruct intermediateRow)
+  {
+    return orcStructFlattener.toMap(intermediateRow);
+  }
+}
diff --git 
a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParserTest.java
 
b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParserTest.java
index e588e60..38eb5da 100644
--- 
a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParserTest.java
+++ 
b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParserTest.java
@@ -27,16 +27,15 @@ import org.apache.druid.indexer.path.StaticPathSpec;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.orc.mapred.OrcInputFormat;
 import org.apache.orc.mapred.OrcStruct;
-import org.apache.orc.mapreduce.OrcInputFormat;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -44,11 +43,12 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 public class OrcHadoopInputRowParserTest
 {
   @Test
-  public void testTest1() throws IOException, InterruptedException
+  public void testTest1() throws IOException
   {
     // total auto-discover fields (no flattenSpec, no dimensionSpec)
     HadoopDruidIndexerConfig config = 
loadHadoopDruidIndexerConfig("example/test_1_hadoop_job.json");
@@ -72,7 +72,7 @@ public class OrcHadoopInputRowParserTest
   }
 
   @Test
-  public void testTest2() throws IOException, InterruptedException
+  public void testTest2() throws IOException
   {
     HadoopDruidIndexerConfig config = 
loadHadoopDruidIndexerConfig("example/test_2_hadoop_job.json");
     Job job = Job.getInstance(new Configuration());
@@ -97,7 +97,7 @@ public class OrcHadoopInputRowParserTest
   }
 
   @Test
-  public void testOrcFile11Format() throws IOException, InterruptedException
+  public void testOrcFile11Format() throws IOException
   {
     // not sure what file 11 format means, but we'll test it!
 
@@ -133,8 +133,8 @@ public class OrcHadoopInputRowParserTest
 
     // first row has empty 'map' column, so lets read another!
     List<InputRow> allRows = getAllRows(config);
-    InputRow anotherRow = allRows.get(0);
-    Assert.assertEquals(14, rows.get(0).getDimensions().size());
+    InputRow anotherRow = allRows.get(allRows.size() - 1);
+    Assert.assertEquals(14, anotherRow.getDimensions().size());
     Assert.assertEquals("true", anotherRow.getDimension("boolean1").get(0));
     Assert.assertEquals("100", anotherRow.getDimension("byte1").get(0));
     Assert.assertEquals("2048", anotherRow.getDimension("short1").get(0));
@@ -142,7 +142,7 @@ public class OrcHadoopInputRowParserTest
     Assert.assertEquals("9223372036854775807", 
anotherRow.getDimension("long1").get(0));
     Assert.assertEquals("2.0", anotherRow.getDimension("float1").get(0));
     Assert.assertEquals("-5.0", anotherRow.getDimension("double1").get(0));
-    Assert.assertEquals("AAECAwQAAA==", 
rows.get(0).getDimension("bytes1").get(0));
+    Assert.assertEquals("", anotherRow.getDimension("bytes1").get(0));
     Assert.assertEquals("bye", anotherRow.getDimension("string1").get(0));
     Assert.assertEquals("1.23456786547457E7", 
anotherRow.getDimension("decimal1").get(0));
     Assert.assertEquals("2", 
anotherRow.getDimension("struct_list_struct_int").get(0));
@@ -151,7 +151,7 @@ public class OrcHadoopInputRowParserTest
   }
 
   @Test
-  public void testOrcSplitElim() throws IOException, InterruptedException
+  public void testOrcSplitElim() throws IOException
   {
     // not sure what SplitElim means, but we'll test it!
 
@@ -175,7 +175,7 @@ public class OrcHadoopInputRowParserTest
   }
 
   @Test
-  public void testDate1900() throws IOException, InterruptedException
+  public void testDate1900() throws IOException
   {
     /*
       TestOrcFile.testDate1900.orc
@@ -194,7 +194,7 @@ public class OrcHadoopInputRowParserTest
   }
 
   @Test
-  public void testDate2038() throws IOException, InterruptedException
+  public void testDate2038() throws IOException
   {
     /*
       TestOrcFile.testDate2038.orc
@@ -217,54 +217,68 @@ public class OrcHadoopInputRowParserTest
     return HadoopDruidIndexerConfig.fromFile(new File(configPath));
   }
 
-  private static OrcStruct getFirstRow(Job job, String orcPath) throws 
IOException, InterruptedException
+  private static OrcStruct getFirstRow(Job job, String orcPath) throws 
IOException
   {
     File testFile = new File(orcPath);
     Path path = new Path(testFile.getAbsoluteFile().toURI());
-    FileSplit split = new FileSplit(path, 0, testFile.length(), null);
+    FileSplit split = new FileSplit(path, 0, testFile.length(), new 
String[]{"host"});
 
-    InputFormat inputFormat = ReflectionUtils.newInstance(
+    InputFormat<NullWritable, OrcStruct> inputFormat = 
ReflectionUtils.newInstance(
         OrcInputFormat.class,
         job.getConfiguration()
     );
-    TaskAttemptContext context = new 
TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
-
-    try (RecordReader reader = inputFormat.createRecordReader(split, context)) 
{
-
-      reader.initialize(split, context);
-      reader.nextKeyValue();
-      return (OrcStruct) reader.getCurrentValue();
+    RecordReader<NullWritable, OrcStruct> reader = inputFormat.getRecordReader(
+        split,
+        new JobConf(job.getConfiguration()),
+        null
+    );
+    try {
+      final NullWritable key = reader.createKey();
+      final OrcStruct value = reader.createValue();
+      if (reader.next(key, value)) {
+        return value;
+      } else {
+        throw new NoSuchElementException();
+      }
+    }
+    finally {
+      reader.close();
     }
   }
 
-  private static List<InputRow> getAllRows(HadoopDruidIndexerConfig config)
-      throws IOException, InterruptedException
+  private static List<InputRow> getAllRows(HadoopDruidIndexerConfig config) 
throws IOException
   {
     Job job = Job.getInstance(new Configuration());
     config.intoConfiguration(job);
 
     File testFile = new File(((StaticPathSpec) 
config.getPathSpec()).getPaths());
     Path path = new Path(testFile.getAbsoluteFile().toURI());
-    FileSplit split = new FileSplit(path, 0, testFile.length(), null);
+    FileSplit split = new FileSplit(path, 0, testFile.length(), new 
String[]{"host"});
 
-    InputFormat inputFormat = ReflectionUtils.newInstance(
+    InputFormat<NullWritable, OrcStruct> inputFormat = 
ReflectionUtils.newInstance(
         OrcInputFormat.class,
         job.getConfiguration()
     );
-    TaskAttemptContext context = new 
TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
-
-    try (RecordReader reader = inputFormat.createRecordReader(split, context)) 
{
+    RecordReader<NullWritable, OrcStruct> reader = inputFormat.getRecordReader(
+        split,
+        new JobConf(job.getConfiguration()),
+        null
+    );
+    try {
       List<InputRow> records = new ArrayList<>();
       InputRowParser parser = config.getParser();
+      final NullWritable key = reader.createKey();
+      OrcStruct value = reader.createValue();
 
-      reader.initialize(split, context);
-      while (reader.nextKeyValue()) {
-        reader.nextKeyValue();
-        Object data = reader.getCurrentValue();
-        records.add(((List<InputRow>) parser.parseBatch(data)).get(0));
+      while (reader.next(key, value)) {
+        records.add(((List<InputRow>) parser.parseBatch(value)).get(0));
+        value = reader.createValue();
       }
 
       return records;
     }
+    finally {
+      reader.close();
+    }
   }
 }
diff --git 
a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
 
b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
new file mode 100644
index 0000000..bef9b64
--- /dev/null
+++ 
b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.orc;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FileEntity;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+public class OrcReaderTest
+{
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  // This test is migrated from OrcHadoopInputRowParserTest
+  @Test
+  public void testTest1() throws IOException
+  {
+    final InputEntityReader reader = createReader(
+        new TimestampSpec("timestamp", "auto", null),
+        new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("col1", 
"col2"))),
+        new OrcInputFormat(null, null, new Configuration()),
+        "example/test_1.orc"
+    );
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      Assert.assertTrue(iterator.hasNext());
+      final InputRow row = iterator.next();
+      Assert.assertEquals(DateTimes.of("2016-01-01T00:00:00.000Z"), 
row.getTimestamp());
+      Assert.assertEquals("bar", 
Iterables.getOnlyElement(row.getDimension("col1")));
+      Assert.assertEquals(ImmutableList.of("dat1", "dat2", "dat3"), 
row.getDimension("col2"));
+      Assert.assertEquals(1.1, row.getMetric("val1").doubleValue(), 0.001);
+      Assert.assertFalse(iterator.hasNext());
+    }
+  }
+
+  // This test is migrated from OrcHadoopInputRowParserTest
+  @Test
+  public void testTest2() throws IOException
+  {
+    final InputFormat inputFormat = new OrcInputFormat(
+        new JSONPathSpec(
+            true,
+            Collections.singletonList(new 
JSONPathFieldSpec(JSONPathFieldType.PATH, "col7-subcol7", "$.col7.subcol7"))
+        ),
+        null,
+        new Configuration()
+    );
+    final InputEntityReader reader = createReader(
+        new TimestampSpec("timestamp", "auto", null),
+        new DimensionsSpec(null),
+        inputFormat,
+        "example/test_2.orc"
+    );
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      Assert.assertTrue(iterator.hasNext());
+      final InputRow row = iterator.next();
+      Assert.assertEquals(DateTimes.of("2016-01-01T00:00:00.000Z"), 
row.getTimestamp());
+      Assert.assertEquals("bar", 
Iterables.getOnlyElement(row.getDimension("col1")));
+      Assert.assertEquals(ImmutableList.of("dat1", "dat2", "dat3"), 
row.getDimension("col2"));
+      Assert.assertEquals("1.1", 
Iterables.getOnlyElement(row.getDimension("col3")));
+      Assert.assertEquals("2", 
Iterables.getOnlyElement(row.getDimension("col4")));
+      Assert.assertEquals("3.5", 
Iterables.getOnlyElement(row.getDimension("col5")));
+      Assert.assertTrue(row.getDimension("col6").isEmpty());
+      Assert.assertFalse(iterator.hasNext());
+    }
+  }
+
+  // This test is migrated from OrcHadoopInputRowParserTest
+  @Test
+  public void testOrcFile11Format() throws IOException
+  {
+    final OrcInputFormat inputFormat = new OrcInputFormat(
+        new JSONPathSpec(
+            true,
+            ImmutableList.of(
+                new JSONPathFieldSpec(JSONPathFieldType.PATH, 
"struct_list_struct_int", "$.middle.list[1].int1"),
+                new JSONPathFieldSpec(JSONPathFieldType.PATH, 
"struct_list_struct_intlist", "$.middle.list[*].int1"),
+                new JSONPathFieldSpec(JSONPathFieldType.PATH, 
"list_struct_string", "$.list[0].string1"),
+                new JSONPathFieldSpec(JSONPathFieldType.PATH, 
"map_struct_int", "$.map.chani.int1")
+            )
+        ),
+        null,
+        new Configuration()
+    );
+    final InputEntityReader reader = createReader(
+        new TimestampSpec("ts", "millis", null),
+        new DimensionsSpec(null),
+        inputFormat,
+        "example/orc-file-11-format.orc"
+    );
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      int actualRowCount = 0;
+
+      // Check the first row
+      Assert.assertTrue(iterator.hasNext());
+      InputRow row = iterator.next();
+      actualRowCount++;
+      Assert.assertEquals("false", 
Iterables.getOnlyElement(row.getDimension("boolean1")));
+      Assert.assertEquals("1", 
Iterables.getOnlyElement(row.getDimension("byte1")));
+      Assert.assertEquals("1024", 
Iterables.getOnlyElement(row.getDimension("short1")));
+      Assert.assertEquals("65536", 
Iterables.getOnlyElement(row.getDimension("int1")));
+      Assert.assertEquals("9223372036854775807", 
Iterables.getOnlyElement(row.getDimension("long1")));
+      Assert.assertEquals("1.0", 
Iterables.getOnlyElement(row.getDimension("float1")));
+      Assert.assertEquals("-15.0", 
Iterables.getOnlyElement(row.getDimension("double1")));
+      Assert.assertEquals("AAECAwQAAA==", 
Iterables.getOnlyElement(row.getDimension("bytes1")));
+      Assert.assertEquals("hi", 
Iterables.getOnlyElement(row.getDimension("string1")));
+      Assert.assertEquals("1.23456786547456E7", 
Iterables.getOnlyElement(row.getDimension("decimal1")));
+      Assert.assertEquals("2", 
Iterables.getOnlyElement(row.getDimension("struct_list_struct_int")));
+      Assert.assertEquals(ImmutableList.of("1", "2"), 
row.getDimension("struct_list_struct_intlist"));
+      Assert.assertEquals("good", 
Iterables.getOnlyElement(row.getDimension("list_struct_string")));
+      Assert.assertEquals(DateTimes.of("2000-03-12T15:00:00.0Z"), 
row.getTimestamp());
+
+      while (iterator.hasNext()) {
+        actualRowCount++;
+        row = iterator.next();
+      }
+
+      // Check the last row
+      Assert.assertEquals("true", 
Iterables.getOnlyElement(row.getDimension("boolean1")));
+      Assert.assertEquals("100", 
Iterables.getOnlyElement(row.getDimension("byte1")));
+      Assert.assertEquals("2048", 
Iterables.getOnlyElement(row.getDimension("short1")));
+      Assert.assertEquals("65536", 
Iterables.getOnlyElement(row.getDimension("int1")));
+      Assert.assertEquals("9223372036854775807", 
Iterables.getOnlyElement(row.getDimension("long1")));
+      Assert.assertEquals("2.0", 
Iterables.getOnlyElement(row.getDimension("float1")));
+      Assert.assertEquals("-5.0", 
Iterables.getOnlyElement(row.getDimension("double1")));
+      Assert.assertEquals("", 
Iterables.getOnlyElement(row.getDimension("bytes1")));
+      Assert.assertEquals("bye", 
Iterables.getOnlyElement(row.getDimension("string1")));
+      Assert.assertEquals("1.23456786547457E7", 
Iterables.getOnlyElement(row.getDimension("decimal1")));
+      Assert.assertEquals("2", 
Iterables.getOnlyElement(row.getDimension("struct_list_struct_int")));
+      Assert.assertEquals(ImmutableList.of("1", "2"), 
row.getDimension("struct_list_struct_intlist"));
+      Assert.assertEquals("cat", 
Iterables.getOnlyElement(row.getDimension("list_struct_string")));
+      Assert.assertEquals("5", 
Iterables.getOnlyElement(row.getDimension("map_struct_int")));
+      Assert.assertEquals(DateTimes.of("2000-03-12T15:00:01.000Z"), 
row.getTimestamp());
+
+      Assert.assertEquals(7500, actualRowCount);
+    }
+  }
+
+  // This test is migrated from OrcHadoopInputRowParserTest
+  @Test
+  public void testOrcSplitElim() throws IOException
+  {
+    final InputEntityReader reader = createReader(
+        new TimestampSpec("ts", "millis", null),
+        new DimensionsSpec(null),
+        new OrcInputFormat(new JSONPathSpec(true, null), null, new 
Configuration()),
+        "example/orc_split_elim.orc"
+    );
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      int actualRowCount = 0;
+      Assert.assertTrue(iterator.hasNext());
+      final InputRow row = iterator.next();
+      actualRowCount++;
+      Assert.assertEquals(DateTimes.of("1969-12-31T16:00:00.0Z"), 
row.getTimestamp());
+      Assert.assertEquals("2", 
Iterables.getOnlyElement(row.getDimension("userid")));
+      Assert.assertEquals("foo", 
Iterables.getOnlyElement(row.getDimension("string1")));
+      Assert.assertEquals("0.8", 
Iterables.getOnlyElement(row.getDimension("subtype")));
+      Assert.assertEquals("1.2", 
Iterables.getOnlyElement(row.getDimension("decimal1")));
+      while (iterator.hasNext()) {
+        actualRowCount++;
+        iterator.next();
+      }
+      Assert.assertEquals(25000, actualRowCount);
+    }
+  }
+
+  // This test is migrated from OrcHadoopInputRowParserTest
+  @Test
+  public void testDate1900() throws IOException
+  {
+    final InputEntityReader reader = createReader(
+        new TimestampSpec("time", "millis", null),
+        new DimensionsSpec(null, Collections.singletonList("time"), null),
+        new OrcInputFormat(new JSONPathSpec(true, null), null, new 
Configuration()),
+        "example/TestOrcFile.testDate1900.orc"
+    );
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      int actualRowCount = 0;
+      Assert.assertTrue(iterator.hasNext());
+      final InputRow row = iterator.next();
+      actualRowCount++;
+      Assert.assertEquals(1, row.getDimensions().size());
+      Assert.assertEquals(DateTimes.of("1900-05-05T12:34:56.1Z"), 
row.getTimestamp());
+      Assert.assertEquals("1900-12-25T00:00:00.000Z", 
Iterables.getOnlyElement(row.getDimension("date")));
+      while (iterator.hasNext()) {
+        actualRowCount++;
+        iterator.next();
+      }
+      Assert.assertEquals(70000, actualRowCount);
+    }
+  }
+
+  // This test is migrated from OrcHadoopInputRowParserTest
+  @Test
+  public void testDate2038() throws IOException
+  {
+    final InputEntityReader reader = createReader(
+        new TimestampSpec("time", "millis", null),
+        new DimensionsSpec(null, Collections.singletonList("time"), null),
+        new OrcInputFormat(new JSONPathSpec(true, null), null, new 
Configuration()),
+        "example/TestOrcFile.testDate2038.orc"
+    );
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      int actualRowCount = 0;
+      Assert.assertTrue(iterator.hasNext());
+      final InputRow row = iterator.next();
+      actualRowCount++;
+      Assert.assertEquals(1, row.getDimensions().size());
+      Assert.assertEquals(DateTimes.of("2038-05-05T12:34:56.1Z"), 
row.getTimestamp());
+      Assert.assertEquals("2038-12-25T00:00:00.000Z", 
Iterables.getOnlyElement(row.getDimension("date")));
+      while (iterator.hasNext()) {
+        actualRowCount++;
+        iterator.next();
+      }
+      Assert.assertEquals(212000, actualRowCount);
+    }
+  }
+
+  private InputEntityReader createReader(
+      TimestampSpec timestampSpec,
+      DimensionsSpec dimensionsSpec,
+      InputFormat inputFormat,
+      String dataFile
+  ) throws IOException
+  {
+    final InputRowSchema schema = new InputRowSchema(timestampSpec, 
dimensionsSpec, Collections.emptyList());
+    final FileEntity entity = new FileEntity(new File(dataFile));
+    return inputFormat.createReader(schema, entity, 
temporaryFolder.newFolder());
+  }
+}
diff --git a/extensions-core/parquet-extensions/pom.xml 
b/extensions-core/parquet-extensions/pom.xml
index b19b271..936028e 100644
--- a/extensions-core/parquet-extensions/pom.xml
+++ b/extensions-core/parquet-extensions/pom.xml
@@ -288,72 +288,72 @@
           <artifactId>jersey-json</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>log4j</artifactId>
           <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>jetty-sslengine</artifactId>
           <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-sslengine</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>jetty-util</artifactId>
           <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>jets3t</artifactId>
           <groupId>net.java.dev.jets3t</groupId>
+          <artifactId>jets3t</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>jetty</artifactId>
           <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>gson</artifactId>
           <groupId>com.google.code.gson</groupId>
+          <artifactId>gson</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>xmlenc</artifactId>
           <groupId>xmlenc</groupId>
+          <artifactId>xmlenc</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>httpclient</artifactId>
           <groupId>org.apache.httpcomponents</groupId>
+          <artifactId>httpclient</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>jsch</artifactId>
           <groupId>com.jcraft</groupId>
+          <artifactId>jsch</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>protobuf-java</artifactId>
           <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>commons-collections</artifactId>
           <groupId>commons-collections</groupId>
+          <artifactId>commons-collections</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>commons-logging</artifactId>
           <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>commons-cli</artifactId>
           <groupId>commons-cli</groupId>
+          <artifactId>commons-cli</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>commons-digester</artifactId>
           <groupId>commons-digester</groupId>
+          <artifactId>commons-digester</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>commons-beanutils-core</artifactId>
           <groupId>commons-beanutils</groupId>
+          <artifactId>commons-beanutils-core</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>apacheds-kerberos-codec</artifactId>
           <groupId>org.apache.directory.server</groupId>
+          <artifactId>apacheds-kerberos-codec</artifactId>
         </exclusion>
         <exclusion>
-          <artifactId>nimbus-jose-jwt</artifactId>
           <groupId>com.nimbusds</groupId>
+          <artifactId>nimbus-jose-jwt</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index dcaffb3..e87e0f7 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -62,6 +62,7 @@ import org.apache.druid.segment.loading.NoopDataSegmentKiller;
 import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.DataSegment;
 import org.junit.After;
 import org.junit.Before;
@@ -78,7 +79,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Executor;
 
-public abstract class IngestionTestBase
+public abstract class IngestionTestBase extends InitializedNullHandlingTest
 {
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to