This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 86e8903 Support orc format for native batch ingestion (#8950)
86e8903 is described below
commit 86e8903523fc3d2d177963d2f95fd56441605102
Author: Jihoon Son <[email protected]>
AuthorDate: Thu Nov 28 12:45:24 2019 -0800
Support orc format for native batch ingestion (#8950)
* Support orc format for native batch ingestion
* fix pom and remove wrong comment
* fix unnecessary condition check
* use flatMap back to handle exception properly
* move exceptionThrowingIterator to intermediateRowParsingReader
* runtime
---
.../data/input/IntermediateRowParsingReader.java | 104 ++++++--
.../util/common/parsers/CloseableIterator.java | 45 +---
extensions-core/orc-extensions/pom.xml | 259 +++++++++++++++++++-
.../druid/data/input/orc/OrcExtensionsModule.java | 49 +++-
.../druid/data/input/orc/OrcInputFormat.java | 87 +++++++
.../org/apache/druid/data/input/orc/OrcReader.java | 162 +++++++++++++
.../input/orc/OrcHadoopInputRowParserTest.java | 90 ++++---
.../apache/druid/data/input/orc/OrcReaderTest.java | 266 +++++++++++++++++++++
extensions-core/parquet-extensions/pom.xml | 34 +--
.../indexing/common/task/IngestionTestBase.java | 3 +-
10 files changed, 975 insertions(+), 124 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
b/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
index aae448b..e0506c3 100644
---
a/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
+++
b/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
@@ -19,13 +19,14 @@
package org.apache.druid.data.input;
-import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import java.io.IOException;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
/**
* {@link InputEntityReader} that parses bytes into some intermediate rows
first, and then into {@link InputRow}s.
@@ -39,25 +40,60 @@ public abstract class IntermediateRowParsingReader<T>
implements InputEntityRead
@Override
public CloseableIterator<InputRow> read() throws IOException
{
- return intermediateRowIterator().flatMap(row -> {
- try {
- // since parseInputRows() returns a list, the below line always
iterates over the list,
- // which means it calls Iterator.hasNext() and Iterator.next() at
least once per row.
- // This could be unnecessary if the row wouldn't be exploded into
multiple inputRows.
- // If this line turned out to be a performance bottleneck, perhaps
parseInputRows() interface might not be a
- // good idea. Subclasses could implement read() with some duplicate
codes to avoid unnecessary iteration on
- // a singleton list.
- return
CloseableIterators.withEmptyBaggage(parseInputRows(row).iterator());
+ final CloseableIterator<T> intermediateRowIterator =
intermediateRowIterator();
+
+ return new CloseableIterator<InputRow>()
+ {
+ // since parseInputRows() returns a list, the below line always iterates
over the list,
+ // which means it calls Iterator.hasNext() and Iterator.next() at least
once per row.
+ // This could be unnecessary if the row wouldn't be exploded into
multiple inputRows.
+ // If this line turned out to be a performance bottleneck, perhaps
parseInputRows() interface might not be a
+ // good idea. Subclasses could implement read() with some duplicate
codes to avoid unnecessary iteration on
+ // a singleton list.
+ Iterator<InputRow> rows = null;
+
+ @Override
+ public boolean hasNext()
+ {
+ if (rows == null || !rows.hasNext()) {
+ if (!intermediateRowIterator.hasNext()) {
+ return false;
+ }
+ final T row = intermediateRowIterator.next();
+ try {
+ rows = parseInputRows(row).iterator();
+ }
+ catch (IOException e) {
+ rows = new ExceptionThrowingIterator(new ParseException(e, "Unable
to parse row [%s]", row));
+ }
+ catch (ParseException e) {
+ rows = new ExceptionThrowingIterator(e);
+ }
+ }
+
+ return true;
}
- catch (IOException e) {
- throw new ParseException(e, "Unable to parse row [%s]", row);
+
+ @Override
+ public InputRow next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ return rows.next();
}
- });
+
+ @Override
+ public void close() throws IOException
+ {
+ intermediateRowIterator.close();
+ }
+ };
}
@Override
- public CloseableIterator<InputRowListPlusRawValues> sample()
- throws IOException
+ public CloseableIterator<InputRowListPlusRawValues> sample() throws
IOException
{
return intermediateRowIterator().map(row -> {
final Map<String, Object> rawColumns;
@@ -87,6 +123,9 @@ public abstract class IntermediateRowParsingReader<T>
implements InputEntityRead
/**
* Parses the given intermediate row into a list of {@link InputRow}s.
+ * This should return a non-empty list.
+ *
+ * @throws ParseException if it cannot parse the given intermediateRow
properly
*/
protected abstract List<InputRow> parseInputRows(T intermediateRow) throws
IOException, ParseException;
@@ -95,4 +134,39 @@ public abstract class IntermediateRowParsingReader<T>
implements InputEntityRead
* Implementations can use any method to convert the given row into a Map.
*/
protected abstract Map<String, Object> toMap(T intermediateRow) throws
IOException;
+
+ private static class ExceptionThrowingIterator implements
CloseableIterator<InputRow>
+ {
+ private final Exception exception;
+
+ private boolean thrown = false;
+
+ private ExceptionThrowingIterator(Exception exception)
+ {
+ this.exception = exception;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return !thrown;
+ }
+
+ @Override
+ public InputRow next()
+ {
+ thrown = true;
+ if (exception instanceof RuntimeException) {
+ throw (RuntimeException) exception;
+ } else {
+ throw new RuntimeException(exception);
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ // do nothing
+ }
+ }
}
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
index 3753b3f..45cda5c 100644
---
a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
+++
b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
@@ -80,14 +80,8 @@ public interface CloseableIterator<T> extends Iterator<T>,
Closeable
throw new UncheckedIOException(e);
}
}
- try {
- iterator = function.apply(delegate.next());
- if (iterator.hasNext()) {
- return iterator;
- }
- }
- catch (Exception e) {
- iterator = new ExceptionThrowingIterator<>(e);
+ iterator = function.apply(delegate.next());
+ if (iterator.hasNext()) {
return iterator;
}
}
@@ -121,39 +115,4 @@ public interface CloseableIterator<T> extends Iterator<T>,
Closeable
}
};
}
-
- class ExceptionThrowingIterator<T> implements CloseableIterator<T>
- {
- private final Exception exception;
-
- private boolean thrown = false;
-
- private ExceptionThrowingIterator(Exception exception)
- {
- this.exception = exception;
- }
-
- @Override
- public boolean hasNext()
- {
- return !thrown;
- }
-
- @Override
- public T next()
- {
- thrown = true;
- if (exception instanceof RuntimeException) {
- throw (RuntimeException) exception;
- } else {
- throw new RuntimeException(exception);
- }
- }
-
- @Override
- public void close() throws IOException
- {
- // do nothing
- }
- }
}
diff --git a/extensions-core/orc-extensions/pom.xml
b/extensions-core/orc-extensions/pom.xml
index c739bed..177e403 100644
--- a/extensions-core/orc-extensions/pom.xml
+++ b/extensions-core/orc-extensions/pom.xml
@@ -49,12 +49,6 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.compile.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
<version>${orc.version}</version>
@@ -178,12 +172,253 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
- <scope>provided</scope>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>aopalliance</groupId>
+ <artifactId>aopalliance</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-servlet</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.inject</groupId>
+ <artifactId>javax</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <!--
+ for native batch indexing with Orc files, we require a small number of
classes provided by hadoop-common and
+ hadoop-mapreduce-client-core. However, both of these jars have a very
large set of dependencies, the majority of
+ which we do not need (and are provided by Hadoop in that environment).
hadoop-common is the biggest offender,
+ with things like zookeeper, jetty, just .. so much stuff. These
exclusions remove ~60 jars from being unnecessarily
+ bundled with this extension. There might be some alternative
arrangement to get what we need, worth looking into if
+ anyone is feeling adventurous.
+ -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.yetus</groupId>
+ <artifactId>audience-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.directory.server</groupId>
+ <artifactId>apacheds-kerberos-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-sslengine</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.java.dev.jets3t</groupId>
+ <artifactId>jets3t</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>jsr311-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>xmlenc</groupId>
+ <artifactId>xmlenc</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.nimbusds</groupId>
+ <artifactId>nimbus-jose-jwt</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
@@ -213,6 +448,7 @@
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
+ <scope>compile</scope>
<exclusions>
<exclusion>
<groupId>commons-lang</groupId>
@@ -229,5 +465,12 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-core</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java
index 138f85e..e57995b 100644
---
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java
+++
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java
@@ -23,13 +23,26 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
+import com.google.inject.Inject;
import org.apache.druid.initialization.DruidModule;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
public class OrcExtensionsModule implements DruidModule
{
+ private Properties props = null;
+
+ @Inject
+ public void setProperties(Properties props)
+ {
+ this.props = props;
+ }
+
@Override
public List<? extends Module> getJacksonModules()
{
@@ -37,13 +50,45 @@ public class OrcExtensionsModule implements DruidModule
new SimpleModule("OrcInputRowParserModule")
.registerSubtypes(
new NamedType(OrcHadoopInputRowParser.class, "orc"),
- new NamedType(OrcParseSpec.class, "orc")
- )
+ new NamedType(OrcParseSpec.class, "orc"),
+ new NamedType(OrcInputFormat.class, "orc")
+ )
);
}
@Override
public void configure(Binder binder)
{
+ // this block of code is common among extensions that use Hadoop things
but are not running in Hadoop, in order
+ // to properly initialize everything
+
+ final Configuration conf = new Configuration();
+
+ // Set explicit CL. Otherwise it'll try to use thread context CL, which
may not have all of our dependencies.
+ conf.setClassLoader(getClass().getClassLoader());
+
+ // Ensure that FileSystem class level initialization happens with correct
CL
+ // See https://github.com/apache/incubator-druid/issues/1714
+ ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+ try {
+
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ FileSystem.get(conf);
+ }
+ catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(currCtxCl);
+ }
+
+ if (props != null) {
+ for (String propName : props.stringPropertyNames()) {
+ if (propName.startsWith("hadoop.")) {
+ conf.set(propName.substring("hadoop.".length()),
props.getProperty(propName));
+ }
+ }
+ }
+
+ binder.bind(Configuration.class).toInstance(conf);
}
}
diff --git
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java
new file mode 100644
index 0000000..bf60fb9
--- /dev/null
+++
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcInputFormat.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.orc;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.NestedInputFormat;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Objects;
+
+public class OrcInputFormat extends NestedInputFormat
+{
+ private final boolean binaryAsString;
+ private final Configuration conf;
+
+ @JsonCreator
+ public OrcInputFormat(
+ @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
+ @JsonProperty("binaryAsString") @Nullable Boolean binaryAsString,
+ @JacksonInject Configuration conf
+ )
+ {
+ super(flattenSpec);
+ this.binaryAsString = binaryAsString == null ? false : binaryAsString;
+ this.conf = conf;
+ }
+
+ @Override
+ public boolean isSplittable()
+ {
+ return false;
+ }
+
+ @Override
+ public InputEntityReader createReader(InputRowSchema inputRowSchema,
InputEntity source, File temporaryDirectory)
+ {
+ return new OrcReader(conf, inputRowSchema, source, temporaryDirectory,
getFlattenSpec(), binaryAsString);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ OrcInputFormat that = (OrcInputFormat) o;
+ return binaryAsString == that.binaryAsString &&
+ Objects.equals(conf, that.conf);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), binaryAsString, conf);
+ }
+}
diff --git
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
new file mode 100644
index 0000000..e9b6817
--- /dev/null
+++
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.orc;
+
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntity.CleanableFile;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ObjectFlattener;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcMapredRecordReader;
+import org.apache.orc.mapred.OrcStruct;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+public class OrcReader extends IntermediateRowParsingReader<OrcStruct>
+{
+ private final Configuration conf;
+ private final InputRowSchema inputRowSchema;
+ private final InputEntity source;
+ private final File temporaryDirectory;
+ private final ObjectFlattener<OrcStruct> orcStructFlattener;
+
+ OrcReader(
+ Configuration conf,
+ InputRowSchema inputRowSchema,
+ InputEntity source,
+ File temporaryDirectory,
+ JSONPathSpec flattenSpec,
+ boolean binaryAsString
+ )
+ {
+ this.conf = conf;
+ this.inputRowSchema = inputRowSchema;
+ this.source = source;
+ this.temporaryDirectory = temporaryDirectory;
+ this.orcStructFlattener = ObjectFlatteners.create(flattenSpec, new
OrcStructFlattenerMaker(binaryAsString));
+ }
+
+ @Override
+ protected CloseableIterator<OrcStruct> intermediateRowIterator() throws
IOException
+ {
+ final Closer closer = Closer.create();
+
+ // We fetch here to cache a copy locally. However, this might need to be
changed if we want to split an orc file
+ // into several InputSplits in the future.
+ final byte[] buffer = new byte[InputEntity.DEFAULT_FETCH_BUFFER_SIZE];
+ final CleanableFile file =
closer.register(source.fetch(temporaryDirectory, buffer));
+ final Path path = new Path(file.file().toURI());
+
+ final ClassLoader currentClassLoader =
Thread.currentThread().getContextClassLoader();
+ final Reader reader;
+ try {
+
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ reader = closer.register(OrcFile.createReader(path,
OrcFile.readerOptions(conf)));
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(currentClassLoader);
+ }
+ // The below line will get the schmea to read the whole columns.
+ // This can be improved by projecting some columns only what users want in
the future.
+ final TypeDescription schema = reader.getSchema();
+ final RecordReader batchReader = reader.rows(reader.options());
+ final OrcMapredRecordReader<OrcStruct> recordReader = new
OrcMapredRecordReader<>(batchReader, schema);
+ closer.register(recordReader::close);
+ return new CloseableIterator<OrcStruct>()
+ {
+ final NullWritable key = recordReader.createKey();
+ OrcStruct value = null;
+
+ @Override
+ public boolean hasNext()
+ {
+ if (value == null) {
+ try {
+ // The returned OrcStruct in next() can be kept in memory for a
while.
+ // Here, we create a new instance of OrcStruct before calling
RecordReader.next(),
+ // so that we can avoid to share the same reference to the "value"
across rows.
+ value = recordReader.createValue();
+ if (!recordReader.next(key, value)) {
+ value = null;
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return value != null;
+ }
+
+ @Override
+ public OrcStruct next()
+ {
+ if (value == null) {
+ throw new NoSuchElementException();
+ }
+ final OrcStruct currentValue = value;
+ value = null;
+ return currentValue;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ closer.close();
+ }
+ };
+ }
+
+ @Override
+ protected List<InputRow> parseInputRows(OrcStruct intermediateRow) throws
ParseException
+ {
+ return Collections.singletonList(
+ MapInputRowParser.parse(
+ inputRowSchema.getTimestampSpec(),
+ inputRowSchema.getDimensionsSpec(),
+ orcStructFlattener.flatten(intermediateRow)
+ )
+ );
+ }
+
+ @Override
+ protected Map<String, Object> toMap(OrcStruct intermediateRow)
+ {
+ return orcStructFlattener.toMap(intermediateRow);
+ }
+}
diff --git
a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParserTest.java
b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParserTest.java
index e588e60..38eb5da 100644
---
a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParserTest.java
+++
b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParserTest.java
@@ -27,16 +27,15 @@ import org.apache.druid.indexer.path.StaticPathSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.orc.mapred.OrcInputFormat;
import org.apache.orc.mapred.OrcStruct;
-import org.apache.orc.mapreduce.OrcInputFormat;
import org.junit.Assert;
import org.junit.Test;
@@ -44,11 +43,12 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.NoSuchElementException;
public class OrcHadoopInputRowParserTest
{
@Test
- public void testTest1() throws IOException, InterruptedException
+ public void testTest1() throws IOException
{
// total auto-discover fields (no flattenSpec, no dimensionSpec)
HadoopDruidIndexerConfig config =
loadHadoopDruidIndexerConfig("example/test_1_hadoop_job.json");
@@ -72,7 +72,7 @@ public class OrcHadoopInputRowParserTest
}
@Test
- public void testTest2() throws IOException, InterruptedException
+ public void testTest2() throws IOException
{
HadoopDruidIndexerConfig config =
loadHadoopDruidIndexerConfig("example/test_2_hadoop_job.json");
Job job = Job.getInstance(new Configuration());
@@ -97,7 +97,7 @@ public class OrcHadoopInputRowParserTest
}
@Test
- public void testOrcFile11Format() throws IOException, InterruptedException
+ public void testOrcFile11Format() throws IOException
{
// not sure what file 11 format means, but we'll test it!
@@ -133,8 +133,8 @@ public class OrcHadoopInputRowParserTest
// first row has empty 'map' column, so lets read another!
List<InputRow> allRows = getAllRows(config);
- InputRow anotherRow = allRows.get(0);
- Assert.assertEquals(14, rows.get(0).getDimensions().size());
+ InputRow anotherRow = allRows.get(allRows.size() - 1);
+ Assert.assertEquals(14, anotherRow.getDimensions().size());
Assert.assertEquals("true", anotherRow.getDimension("boolean1").get(0));
Assert.assertEquals("100", anotherRow.getDimension("byte1").get(0));
Assert.assertEquals("2048", anotherRow.getDimension("short1").get(0));
@@ -142,7 +142,7 @@ public class OrcHadoopInputRowParserTest
Assert.assertEquals("9223372036854775807",
anotherRow.getDimension("long1").get(0));
Assert.assertEquals("2.0", anotherRow.getDimension("float1").get(0));
Assert.assertEquals("-5.0", anotherRow.getDimension("double1").get(0));
- Assert.assertEquals("AAECAwQAAA==",
rows.get(0).getDimension("bytes1").get(0));
+ Assert.assertEquals("", anotherRow.getDimension("bytes1").get(0));
Assert.assertEquals("bye", anotherRow.getDimension("string1").get(0));
Assert.assertEquals("1.23456786547457E7",
anotherRow.getDimension("decimal1").get(0));
Assert.assertEquals("2",
anotherRow.getDimension("struct_list_struct_int").get(0));
@@ -151,7 +151,7 @@ public class OrcHadoopInputRowParserTest
}
@Test
- public void testOrcSplitElim() throws IOException, InterruptedException
+ public void testOrcSplitElim() throws IOException
{
// not sure what SplitElim means, but we'll test it!
@@ -175,7 +175,7 @@ public class OrcHadoopInputRowParserTest
}
@Test
- public void testDate1900() throws IOException, InterruptedException
+ public void testDate1900() throws IOException
{
/*
TestOrcFile.testDate1900.orc
@@ -194,7 +194,7 @@ public class OrcHadoopInputRowParserTest
}
@Test
- public void testDate2038() throws IOException, InterruptedException
+ public void testDate2038() throws IOException
{
/*
TestOrcFile.testDate2038.orc
@@ -217,54 +217,68 @@ public class OrcHadoopInputRowParserTest
return HadoopDruidIndexerConfig.fromFile(new File(configPath));
}
- private static OrcStruct getFirstRow(Job job, String orcPath) throws
IOException, InterruptedException
+ private static OrcStruct getFirstRow(Job job, String orcPath) throws
IOException
{
File testFile = new File(orcPath);
Path path = new Path(testFile.getAbsoluteFile().toURI());
- FileSplit split = new FileSplit(path, 0, testFile.length(), null);
+ FileSplit split = new FileSplit(path, 0, testFile.length(), new
String[]{"host"});
- InputFormat inputFormat = ReflectionUtils.newInstance(
+ InputFormat<NullWritable, OrcStruct> inputFormat =
ReflectionUtils.newInstance(
OrcInputFormat.class,
job.getConfiguration()
);
- TaskAttemptContext context = new
TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
-
- try (RecordReader reader = inputFormat.createRecordReader(split, context))
{
-
- reader.initialize(split, context);
- reader.nextKeyValue();
- return (OrcStruct) reader.getCurrentValue();
+ RecordReader<NullWritable, OrcStruct> reader = inputFormat.getRecordReader(
+ split,
+ new JobConf(job.getConfiguration()),
+ null
+ );
+ try {
+ final NullWritable key = reader.createKey();
+ final OrcStruct value = reader.createValue();
+ if (reader.next(key, value)) {
+ return value;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+ finally {
+ reader.close();
}
}
- private static List<InputRow> getAllRows(HadoopDruidIndexerConfig config)
- throws IOException, InterruptedException
+ private static List<InputRow> getAllRows(HadoopDruidIndexerConfig config)
throws IOException
{
Job job = Job.getInstance(new Configuration());
config.intoConfiguration(job);
File testFile = new File(((StaticPathSpec)
config.getPathSpec()).getPaths());
Path path = new Path(testFile.getAbsoluteFile().toURI());
- FileSplit split = new FileSplit(path, 0, testFile.length(), null);
+ FileSplit split = new FileSplit(path, 0, testFile.length(), new
String[]{"host"});
- InputFormat inputFormat = ReflectionUtils.newInstance(
+ InputFormat<NullWritable, OrcStruct> inputFormat =
ReflectionUtils.newInstance(
OrcInputFormat.class,
job.getConfiguration()
);
- TaskAttemptContext context = new
TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
-
- try (RecordReader reader = inputFormat.createRecordReader(split, context))
{
+ RecordReader<NullWritable, OrcStruct> reader = inputFormat.getRecordReader(
+ split,
+ new JobConf(job.getConfiguration()),
+ null
+ );
+ try {
List<InputRow> records = new ArrayList<>();
InputRowParser parser = config.getParser();
+ final NullWritable key = reader.createKey();
+ OrcStruct value = reader.createValue();
- reader.initialize(split, context);
- while (reader.nextKeyValue()) {
- reader.nextKeyValue();
- Object data = reader.getCurrentValue();
- records.add(((List<InputRow>) parser.parseBatch(data)).get(0));
+ while (reader.next(key, value)) {
+ records.add(((List<InputRow>) parser.parseBatch(value)).get(0));
+ value = reader.createValue();
}
return records;
}
+ finally {
+ reader.close();
+ }
}
}
diff --git
a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
new file mode 100644
index 0000000..bef9b64
--- /dev/null
+++
b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.orc;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FileEntity;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+
+public class OrcReaderTest
+{
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ // This test is migrated from OrcHadoopInputRowParserTest
+ @Test
+ public void testTest1() throws IOException
+ {
+ final InputEntityReader reader = createReader(
+ new TimestampSpec("timestamp", "auto", null),
+ new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("col1",
"col2"))),
+ new OrcInputFormat(null, null, new Configuration()),
+ "example/test_1.orc"
+ );
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ Assert.assertTrue(iterator.hasNext());
+ final InputRow row = iterator.next();
+ Assert.assertEquals(DateTimes.of("2016-01-01T00:00:00.000Z"),
row.getTimestamp());
+ Assert.assertEquals("bar",
Iterables.getOnlyElement(row.getDimension("col1")));
+ Assert.assertEquals(ImmutableList.of("dat1", "dat2", "dat3"),
row.getDimension("col2"));
+ Assert.assertEquals(1.1, row.getMetric("val1").doubleValue(), 0.001);
+ Assert.assertFalse(iterator.hasNext());
+ }
+ }
+
+ // This test is migrated from OrcHadoopInputRowParserTest
+ @Test
+ public void testTest2() throws IOException
+ {
+ final InputFormat inputFormat = new OrcInputFormat(
+ new JSONPathSpec(
+ true,
+ Collections.singletonList(new
JSONPathFieldSpec(JSONPathFieldType.PATH, "col7-subcol7", "$.col7.subcol7"))
+ ),
+ null,
+ new Configuration()
+ );
+ final InputEntityReader reader = createReader(
+ new TimestampSpec("timestamp", "auto", null),
+ new DimensionsSpec(null),
+ inputFormat,
+ "example/test_2.orc"
+ );
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ Assert.assertTrue(iterator.hasNext());
+ final InputRow row = iterator.next();
+ Assert.assertEquals(DateTimes.of("2016-01-01T00:00:00.000Z"),
row.getTimestamp());
+ Assert.assertEquals("bar",
Iterables.getOnlyElement(row.getDimension("col1")));
+ Assert.assertEquals(ImmutableList.of("dat1", "dat2", "dat3"),
row.getDimension("col2"));
+ Assert.assertEquals("1.1",
Iterables.getOnlyElement(row.getDimension("col3")));
+ Assert.assertEquals("2",
Iterables.getOnlyElement(row.getDimension("col4")));
+ Assert.assertEquals("3.5",
Iterables.getOnlyElement(row.getDimension("col5")));
+ Assert.assertTrue(row.getDimension("col6").isEmpty());
+ Assert.assertFalse(iterator.hasNext());
+ }
+ }
+
+ // This test is migrated from OrcHadoopInputRowParserTest
+ @Test
+ public void testOrcFile11Format() throws IOException
+ {
+ final OrcInputFormat inputFormat = new OrcInputFormat(
+ new JSONPathSpec(
+ true,
+ ImmutableList.of(
+ new JSONPathFieldSpec(JSONPathFieldType.PATH,
"struct_list_struct_int", "$.middle.list[1].int1"),
+ new JSONPathFieldSpec(JSONPathFieldType.PATH,
"struct_list_struct_intlist", "$.middle.list[*].int1"),
+ new JSONPathFieldSpec(JSONPathFieldType.PATH,
"list_struct_string", "$.list[0].string1"),
+ new JSONPathFieldSpec(JSONPathFieldType.PATH,
"map_struct_int", "$.map.chani.int1")
+ )
+ ),
+ null,
+ new Configuration()
+ );
+ final InputEntityReader reader = createReader(
+ new TimestampSpec("ts", "millis", null),
+ new DimensionsSpec(null),
+ inputFormat,
+ "example/orc-file-11-format.orc"
+ );
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ int actualRowCount = 0;
+
+ // Check the first row
+ Assert.assertTrue(iterator.hasNext());
+ InputRow row = iterator.next();
+ actualRowCount++;
+ Assert.assertEquals("false",
Iterables.getOnlyElement(row.getDimension("boolean1")));
+ Assert.assertEquals("1",
Iterables.getOnlyElement(row.getDimension("byte1")));
+ Assert.assertEquals("1024",
Iterables.getOnlyElement(row.getDimension("short1")));
+ Assert.assertEquals("65536",
Iterables.getOnlyElement(row.getDimension("int1")));
+ Assert.assertEquals("9223372036854775807",
Iterables.getOnlyElement(row.getDimension("long1")));
+ Assert.assertEquals("1.0",
Iterables.getOnlyElement(row.getDimension("float1")));
+ Assert.assertEquals("-15.0",
Iterables.getOnlyElement(row.getDimension("double1")));
+ Assert.assertEquals("AAECAwQAAA==",
Iterables.getOnlyElement(row.getDimension("bytes1")));
+ Assert.assertEquals("hi",
Iterables.getOnlyElement(row.getDimension("string1")));
+ Assert.assertEquals("1.23456786547456E7",
Iterables.getOnlyElement(row.getDimension("decimal1")));
+ Assert.assertEquals("2",
Iterables.getOnlyElement(row.getDimension("struct_list_struct_int")));
+ Assert.assertEquals(ImmutableList.of("1", "2"),
row.getDimension("struct_list_struct_intlist"));
+ Assert.assertEquals("good",
Iterables.getOnlyElement(row.getDimension("list_struct_string")));
+ Assert.assertEquals(DateTimes.of("2000-03-12T15:00:00.0Z"),
row.getTimestamp());
+
+ while (iterator.hasNext()) {
+ actualRowCount++;
+ row = iterator.next();
+ }
+
+ // Check the last row
+ Assert.assertEquals("true",
Iterables.getOnlyElement(row.getDimension("boolean1")));
+ Assert.assertEquals("100",
Iterables.getOnlyElement(row.getDimension("byte1")));
+ Assert.assertEquals("2048",
Iterables.getOnlyElement(row.getDimension("short1")));
+ Assert.assertEquals("65536",
Iterables.getOnlyElement(row.getDimension("int1")));
+ Assert.assertEquals("9223372036854775807",
Iterables.getOnlyElement(row.getDimension("long1")));
+ Assert.assertEquals("2.0",
Iterables.getOnlyElement(row.getDimension("float1")));
+ Assert.assertEquals("-5.0",
Iterables.getOnlyElement(row.getDimension("double1")));
+ Assert.assertEquals("",
Iterables.getOnlyElement(row.getDimension("bytes1")));
+ Assert.assertEquals("bye",
Iterables.getOnlyElement(row.getDimension("string1")));
+ Assert.assertEquals("1.23456786547457E7",
Iterables.getOnlyElement(row.getDimension("decimal1")));
+ Assert.assertEquals("2",
Iterables.getOnlyElement(row.getDimension("struct_list_struct_int")));
+ Assert.assertEquals(ImmutableList.of("1", "2"),
row.getDimension("struct_list_struct_intlist"));
+ Assert.assertEquals("cat",
Iterables.getOnlyElement(row.getDimension("list_struct_string")));
+ Assert.assertEquals("5",
Iterables.getOnlyElement(row.getDimension("map_struct_int")));
+ Assert.assertEquals(DateTimes.of("2000-03-12T15:00:01.000Z"),
row.getTimestamp());
+
+ Assert.assertEquals(7500, actualRowCount);
+ }
+ }
+
+ // This test is migrated from OrcHadoopInputRowParserTest
+ @Test
+ public void testOrcSplitElim() throws IOException
+ {
+ final InputEntityReader reader = createReader(
+ new TimestampSpec("ts", "millis", null),
+ new DimensionsSpec(null),
+ new OrcInputFormat(new JSONPathSpec(true, null), null, new
Configuration()),
+ "example/orc_split_elim.orc"
+ );
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ int actualRowCount = 0;
+ Assert.assertTrue(iterator.hasNext());
+ final InputRow row = iterator.next();
+ actualRowCount++;
+ Assert.assertEquals(DateTimes.of("1969-12-31T16:00:00.0Z"),
row.getTimestamp());
+ Assert.assertEquals("2",
Iterables.getOnlyElement(row.getDimension("userid")));
+ Assert.assertEquals("foo",
Iterables.getOnlyElement(row.getDimension("string1")));
+ Assert.assertEquals("0.8",
Iterables.getOnlyElement(row.getDimension("subtype")));
+ Assert.assertEquals("1.2",
Iterables.getOnlyElement(row.getDimension("decimal1")));
+ while (iterator.hasNext()) {
+ actualRowCount++;
+ iterator.next();
+ }
+ Assert.assertEquals(25000, actualRowCount);
+ }
+ }
+
+ // This test is migrated from OrcHadoopInputRowParserTest
+ @Test
+ public void testDate1900() throws IOException
+ {
+ final InputEntityReader reader = createReader(
+ new TimestampSpec("time", "millis", null),
+ new DimensionsSpec(null, Collections.singletonList("time"), null),
+ new OrcInputFormat(new JSONPathSpec(true, null), null, new
Configuration()),
+ "example/TestOrcFile.testDate1900.orc"
+ );
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ int actualRowCount = 0;
+ Assert.assertTrue(iterator.hasNext());
+ final InputRow row = iterator.next();
+ actualRowCount++;
+ Assert.assertEquals(1, row.getDimensions().size());
+ Assert.assertEquals(DateTimes.of("1900-05-05T12:34:56.1Z"),
row.getTimestamp());
+ Assert.assertEquals("1900-12-25T00:00:00.000Z",
Iterables.getOnlyElement(row.getDimension("date")));
+ while (iterator.hasNext()) {
+ actualRowCount++;
+ iterator.next();
+ }
+ Assert.assertEquals(70000, actualRowCount);
+ }
+ }
+
+ // This test is migrated from OrcHadoopInputRowParserTest
+ @Test
+ public void testDate2038() throws IOException
+ {
+ final InputEntityReader reader = createReader(
+ new TimestampSpec("time", "millis", null),
+ new DimensionsSpec(null, Collections.singletonList("time"), null),
+ new OrcInputFormat(new JSONPathSpec(true, null), null, new
Configuration()),
+ "example/TestOrcFile.testDate2038.orc"
+ );
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ int actualRowCount = 0;
+ Assert.assertTrue(iterator.hasNext());
+ final InputRow row = iterator.next();
+ actualRowCount++;
+ Assert.assertEquals(1, row.getDimensions().size());
+ Assert.assertEquals(DateTimes.of("2038-05-05T12:34:56.1Z"),
row.getTimestamp());
+ Assert.assertEquals("2038-12-25T00:00:00.000Z",
Iterables.getOnlyElement(row.getDimension("date")));
+ while (iterator.hasNext()) {
+ actualRowCount++;
+ iterator.next();
+ }
+ Assert.assertEquals(212000, actualRowCount);
+ }
+ }
+
+ private InputEntityReader createReader(
+ TimestampSpec timestampSpec,
+ DimensionsSpec dimensionsSpec,
+ InputFormat inputFormat,
+ String dataFile
+ ) throws IOException
+ {
+ final InputRowSchema schema = new InputRowSchema(timestampSpec,
dimensionsSpec, Collections.emptyList());
+ final FileEntity entity = new FileEntity(new File(dataFile));
+ return inputFormat.createReader(schema, entity,
temporaryFolder.newFolder());
+ }
+}
diff --git a/extensions-core/parquet-extensions/pom.xml
b/extensions-core/parquet-extensions/pom.xml
index b19b271..936028e 100644
--- a/extensions-core/parquet-extensions/pom.xml
+++ b/extensions-core/parquet-extensions/pom.xml
@@ -288,72 +288,72 @@
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
- <artifactId>log4j</artifactId>
<groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
</exclusion>
<exclusion>
- <artifactId>jetty-sslengine</artifactId>
<groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-sslengine</artifactId>
</exclusion>
<exclusion>
- <artifactId>jetty-util</artifactId>
<groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
- <artifactId>jets3t</artifactId>
<groupId>net.java.dev.jets3t</groupId>
+ <artifactId>jets3t</artifactId>
</exclusion>
<exclusion>
- <artifactId>jetty</artifactId>
<groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
</exclusion>
<exclusion>
- <artifactId>gson</artifactId>
<groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
</exclusion>
<exclusion>
- <artifactId>xmlenc</artifactId>
<groupId>xmlenc</groupId>
+ <artifactId>xmlenc</artifactId>
</exclusion>
<exclusion>
- <artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
- <artifactId>jsch</artifactId>
<groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
</exclusion>
<exclusion>
- <artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
- <artifactId>commons-collections</artifactId>
<groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
</exclusion>
<exclusion>
- <artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
- <artifactId>commons-cli</artifactId>
<groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
- <artifactId>commons-digester</artifactId>
<groupId>commons-digester</groupId>
+ <artifactId>commons-digester</artifactId>
</exclusion>
<exclusion>
- <artifactId>commons-beanutils-core</artifactId>
<groupId>commons-beanutils</groupId>
+ <artifactId>commons-beanutils-core</artifactId>
</exclusion>
<exclusion>
- <artifactId>apacheds-kerberos-codec</artifactId>
<groupId>org.apache.directory.server</groupId>
+ <artifactId>apacheds-kerberos-codec</artifactId>
</exclusion>
<exclusion>
- <artifactId>nimbus-jose-jwt</artifactId>
<groupId>com.nimbusds</groupId>
+ <artifactId>nimbus-jose-jwt</artifactId>
</exclusion>
</exclusions>
</dependency>
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index dcaffb3..e87e0f7 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -62,6 +62,7 @@ import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.junit.After;
import org.junit.Before;
@@ -78,7 +79,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
-public abstract class IngestionTestBase
+public abstract class IngestionTestBase extends InitializedNullHandlingTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]