This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6d11edd ORC read integration for Spark 2.4.0 (#139)
6d11edd is described below
commit 6d11edd196c6ba7813af2145035787ac2b41ffda
Author: Edgar Rodriguez <[email protected]>
AuthorDate: Thu May 16 13:15:37 2019 -0700
ORC read integration for Spark 2.4.0 (#139)
---
build.gradle | 5 +-
.../java/org/apache/iceberg/orc/ColumnIdMap.java | 23 +-
orc/src/main/java/org/apache/iceberg/orc/ORC.java | 85 ++-
.../org/apache/iceberg/orc/OrcFileAppender.java | 103 ++-
.../java/org/apache/iceberg/orc/OrcIterable.java | 125 ++++
.../org/apache/iceberg/orc/OrcValueReader.java | 34 +
.../org/apache/iceberg/orc/OrcValueWriter.java | 37 +
.../org/apache/iceberg/orc/TypeConversion.java | 23 +-
...erator.java => VectorizedRowBatchIterator.java} | 34 +-
.../apache/iceberg/spark/data/SparkOrcReader.java | 782 +++++++++++++++++++++
.../apache/iceberg/spark/data/SparkOrcWriter.java | 409 +++++++++++
.../org/apache/iceberg/spark/source/Reader.java | 17 +
.../org/apache/iceberg/spark/data/TestHelpers.java | 3 +
.../iceberg/spark/data/TestSparkOrcReader.java | 64 ++
14 files changed, 1641 insertions(+), 103 deletions(-)
diff --git a/build.gradle b/build.gradle
index 5161e92..51d4382 100644
--- a/build.gradle
+++ b/build.gradle
@@ -76,7 +76,7 @@ subprojects {
ext {
hadoopVersion = '2.7.3'
avroVersion = '1.8.2'
- orcVersion = '1.4.2'
+ orcVersion = '1.5.5'
parquetVersion = '1.10.0'
hiveVersion = '1.2.1'
@@ -266,6 +266,7 @@ project(':iceberg-spark') {
compile project(':iceberg-api')
compile project(':iceberg-common')
compile project(':iceberg-core')
+ compile project(':iceberg-orc')
compile project(':iceberg-parquet')
compileOnly "org.apache.avro:avro:$avroVersion"
@@ -369,11 +370,13 @@ project(':iceberg-presto-runtime') {
dependencies {
shadow project(':iceberg-api')
shadow project(':iceberg-core')
+ shadow project(':iceberg-orc')
shadow project(':iceberg-parquet')
shadow project(':iceberg-hive')
shadow "org.apache.parquet:parquet-avro:$parquetVersion"
shadow "org.apache.avro:avro:$avroVersion"
+ shadow "org.apache.orc:orc-core:$orcVersion:nohive"
shadow ("org.apache.hive:hive-metastore:$hiveVersion") {
exclude group: 'org.apache.hadoop', module: 'hadoop-common'
// exclude group: 'org.apache.orc', module: 'orc-core'
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java
b/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java
index 330554f..16dc3b0 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ColumnIdMap.java
@@ -1,17 +1,20 @@
/*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.iceberg.orc;
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index 157a761..c6cb036 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -1,38 +1,47 @@
/*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.iceberg.orc;
import com.google.common.base.Preconditions;
-import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
+import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Schema;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopOutputFile;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
+import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
+import static
org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch.DEFAULT_SIZE;
+
public class ORC {
+
+ static final String VECTOR_ROW_BATCH_SIZE = "iceberg.orc.vectorbatch.size";
+
private ORC() {
}
@@ -44,6 +53,7 @@ public class ORC {
private final OutputFile file;
private final Configuration conf;
private Schema schema = null;
+ private Function<TypeDescription, OrcValueWriter<?>> createWriterFunc;
private Map<String, byte[]> metadata = new HashMap<>();
private WriteBuilder(OutputFile file) {
@@ -65,15 +75,27 @@ public class ORC {
return this;
}
+ public WriteBuilder createWriterFunc(Function<TypeDescription,
OrcValueWriter<?>> writerFunction) {
+ this.createWriterFunc = writerFunction;
+ return this;
+ }
+
+ public WriteBuilder setAll(Map<String, String> properties) {
+ properties.forEach(conf::set);
+ return this;
+ }
+
public WriteBuilder schema(Schema schema) {
this.schema = schema;
return this;
}
- public OrcFileAppender build() {
- OrcFile.WriterOptions options =
- OrcFile.writerOptions(conf);
- return new OrcFileAppender(schema, file, options, metadata);
+ public <D> FileAppender<D> build() {
+ Preconditions.checkNotNull(schema, "Schema is required");
+ OrcFile.WriterOptions options = OrcFile.writerOptions(conf);
+ return new OrcFileAppender<>(TypeConversion.toOrc(schema, new
ColumnIdMap()),
+ this.file, createWriterFunc, options, metadata,
+ conf.getInt(VECTOR_ROW_BATCH_SIZE, DEFAULT_SIZE));
}
}
@@ -88,6 +110,8 @@ public class ORC {
private Long start = null;
private Long length = null;
+ private Function<Schema, OrcValueReader<?>> readerFunction;
+
private ReadBuilder(InputFile file) {
Preconditions.checkNotNull(file, "Input file cannot be null");
this.file = file;
@@ -116,27 +140,24 @@ public class ORC {
return this;
}
+ public ReadBuilder caseSensitive(boolean caseSensitive) {
+ OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(this.conf,
caseSensitive);
+ return this;
+ }
+
public ReadBuilder config(String property, String value) {
conf.set(property, value);
return this;
}
- public OrcIterator build() {
+ public ReadBuilder createReaderFunc(Function<Schema, OrcValueReader<?>>
readerFunction) {
+ this.readerFunction = readerFunction;
+ return this;
+ }
+
+ public <D> CloseableIterable<D> build() {
Preconditions.checkNotNull(schema, "Schema is required");
- try {
- Path path = new Path(file.location());
- Reader reader = OrcFile.createReader(path,
OrcFile.readerOptions(conf));
- ColumnIdMap columnIds = new ColumnIdMap();
- TypeDescription orcSchema = TypeConversion.toOrc(schema, columnIds);
- Reader.Options options = reader.options();
- if (start != null) {
- options.range(start, length);
- }
- options.schema(orcSchema);
- return new OrcIterator(path, orcSchema, reader.rows(options));
- } catch (IOException e) {
- throw new RuntimeException("Can't open " + file.location(), e);
- }
+ return new OrcIterable<>(file, conf, schema, start, length,
readerFunction);
}
}
}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
index 257e084..60c738c 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
@@ -1,18 +1,22 @@
/*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
+
package org.apache.iceberg.orc;
import com.google.common.base.Preconditions;
@@ -20,9 +24,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
+import java.util.function.Function;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Metrics;
-import org.apache.iceberg.Schema;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.orc.ColumnStatistics;
@@ -34,36 +38,40 @@ import
org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
/**
* Create a file appender for ORC.
*/
-public class OrcFileAppender implements FileAppender<VectorizedRowBatch> {
- private final Writer writer;
+class OrcFileAppender<D> implements FileAppender<D> {
+ private final int batchSize;
private final TypeDescription orcSchema;
private final ColumnIdMap columnIds = new ColumnIdMap();
private final Path path;
+ private final Writer writer;
+ private final VectorizedRowBatch batch;
+ private final OrcValueWriter<D> valueWriter;
private boolean isClosed = false;
- public static final String COLUMN_NUMBERS_ATTRIBUTE = "iceberg.column.ids";
+ private static final String COLUMN_NUMBERS_ATTRIBUTE = "iceberg.column.ids";
- OrcFileAppender(Schema schema,
- OutputFile file,
- OrcFile.WriterOptions options,
- Map<String,byte[]> metadata) {
- orcSchema = TypeConversion.toOrc(schema, columnIds);
- options.setSchema(orcSchema);
+ OrcFileAppender(TypeDescription schema, OutputFile file,
+ Function<TypeDescription, OrcValueWriter<?>>
createWriterFunc,
+ OrcFile.WriterOptions options, Map<String, byte[]> metadata,
+ int batchSize) {
+ orcSchema = schema;
path = new Path(file.location());
- try {
- writer = OrcFile.createWriter(path, options);
- } catch (IOException e) {
- throw new RuntimeException("Can't create file " + path, e);
- }
- writer.addUserMetadata(COLUMN_NUMBERS_ATTRIBUTE, columnIds.serialize());
- metadata.forEach(
- (key,value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value)));
+ this.batchSize = batchSize;
+ batch = orcSchema.createRowBatch(this.batchSize);
+
+ options.setSchema(orcSchema);
+ writer = newOrcWriter(file, columnIds, options, metadata);
+ valueWriter = newOrcValueWriter(orcSchema, createWriterFunc);
}
@Override
- public void add(VectorizedRowBatch datum) {
+ public void add(D datum) {
try {
- writer.addRowBatch(datum);
+ valueWriter.write(datum, batch);
+ if (batch.size == this.batchSize) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
} catch (IOException e) {
throw new RuntimeException("Problem writing to ORC file " + path, e);
}
@@ -108,12 +116,39 @@ public class OrcFileAppender implements
FileAppender<VectorizedRowBatch> {
@Override
public void close() throws IOException {
if (!isClosed) {
- this.isClosed = true;
- writer.close();
+ try {
+ if (batch.size > 0) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+ } finally {
+ writer.close();
+ this.isClosed = true;
+ }
}
}
- public TypeDescription getSchema() {
- return orcSchema;
+ private static Writer newOrcWriter(OutputFile file,
+ ColumnIdMap columnIds,
+ OrcFile.WriterOptions options,
Map<String, byte[]> metadata) {
+ final Path locPath = new Path(file.location());
+ final Writer writer;
+
+ try {
+ writer = OrcFile.createWriter(locPath, options);
+ } catch (IOException e) {
+ throw new RuntimeException("Can't create file " + locPath, e);
+ }
+
+ writer.addUserMetadata(COLUMN_NUMBERS_ATTRIBUTE, columnIds.serialize());
+ metadata.forEach((key,value) -> writer.addUserMetadata(key,
ByteBuffer.wrap(value)));
+
+ return writer;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <D> OrcValueWriter<D> newOrcValueWriter(TypeDescription
schema,
+
Function<TypeDescription, OrcValueWriter<?>> createWriterFunc) {
+ return (OrcValueWriter<D>) createWriterFunc.apply(schema);
}
}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
new file mode 100644
index 0000000..b4bed83
--- /dev/null
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+/**
+ * Iterable used to read rows from ORC.
+ */
+class OrcIterable<T> extends CloseableGroup implements CloseableIterable<T> {
+ private final Configuration config;
+ private final Schema schema;
+ private final InputFile file;
+ private final Long start;
+ private final Long length;
+ private final Function<Schema, OrcValueReader<?>> readerFunction;
+
+ OrcIterable(InputFile file, Configuration config, Schema schema,
+ Long start, Long length,
+ Function<Schema, OrcValueReader<?>> readerFunction) {
+ this.schema = schema;
+ this.readerFunction = readerFunction;
+ this.file = file;
+ this.start = start;
+ this.length = length;
+ this.config = config;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Iterator<T> iterator() {
+ return new OrcIterator(
+ newOrcIterator(file, TypeConversion.toOrc(schema, new ColumnIdMap()),
+ start, length, newFileReader(file, config)),
+ readerFunction.apply(schema));
+ }
+
+ private static VectorizedRowBatchIterator newOrcIterator(InputFile file,
+ TypeDescription
readerSchema,
+ Long start, Long
length,
+ Reader
orcFileReader) {
+ final Reader.Options options = orcFileReader.options();
+ if (start != null) {
+ options.range(start, length);
+ }
+ options.schema(readerSchema);
+
+ try {
+ return new VectorizedRowBatchIterator(file.location(), readerSchema,
orcFileReader.rows(options));
+ } catch (IOException ioe) {
+ throw new RuntimeIOException(ioe, "Failed to get ORC rows for file: %s",
file);
+ }
+ }
+
+ private static Reader newFileReader(InputFile file, Configuration config) {
+ try {
+ return OrcFile.createReader(new Path(file.location()),
+ OrcFile.readerOptions(config));
+ } catch (IOException ioe) {
+ throw new RuntimeIOException(ioe, "Failed to open file: %s", file);
+ }
+ }
+
+ private static class OrcIterator<T> implements Iterator<T> {
+
+ private int nextRow;
+ private VectorizedRowBatch current;
+
+ final VectorizedRowBatchIterator batchIter;
+ final OrcValueReader<T> reader;
+
+ OrcIterator(VectorizedRowBatchIterator batchIter, OrcValueReader<T>
reader) {
+ this.batchIter = batchIter;
+ this.reader = reader;
+ current = null;
+ nextRow = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return (current != null && nextRow < current.size) ||
batchIter.hasNext();
+ }
+
+ @Override
+ public T next() {
+ if (current == null || nextRow >= current.size) {
+ current = batchIter.next();
+ nextRow = 0;
+ }
+
+ return this.reader.read(current, nextRow++);
+ }
+ }
+
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java
b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java
new file mode 100644
index 0000000..cfc9ebb
--- /dev/null
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+/**
+ * Used for implementing ORC value readers.
+ */
+public interface OrcValueReader<T> {
+
+ /**
+ * Reads a value in row.
+ */
+ T read(VectorizedRowBatch batch, int row);
+
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java
b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java
new file mode 100644
index 0000000..5f1e167
--- /dev/null
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import java.io.IOException;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+
+/**
+ * Write data value of a schema.
+ */
+public interface OrcValueWriter<T> {
+
+ /**
+ * Writes the data.
+ * @param value the data value to write.
+ * @param output the VectorizedRowBatch to which the output will be written.
+ * @throws IOException if there's any IO error while writing the data value.
+ */
+ void write(T value, VectorizedRowBatch output) throws IOException;
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java
b/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java
index bc57f8d..f9839f6 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/TypeConversion.java
@@ -1,17 +1,20 @@
/*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.iceberg.orc;
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterator.java
b/orc/src/main/java/org/apache/iceberg/orc/VectorizedRowBatchIterator.java
similarity index 55%
rename from orc/src/main/java/org/apache/iceberg/orc/OrcIterator.java
rename to
orc/src/main/java/org/apache/iceberg/orc/VectorizedRowBatchIterator.java
index 589e5ee..ddc0bce 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcIterator.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/VectorizedRowBatchIterator.java
@@ -1,17 +1,20 @@
/*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.iceberg.orc;
@@ -19,7 +22,6 @@ package org.apache.iceberg.orc;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
-import org.apache.hadoop.fs.Path;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
@@ -29,14 +31,14 @@ import
org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
* Because the same VectorizedRowBatch is reused on each call to next,
* it gets changed when hasNext or next is called.
*/
-public class OrcIterator implements Iterator<VectorizedRowBatch>, Closeable {
- private final Path filename;
+public class VectorizedRowBatchIterator implements
Iterator<VectorizedRowBatch>, Closeable {
+ private final String fileLocation;
private final RecordReader rows;
private final VectorizedRowBatch batch;
private boolean advanced = false;
- OrcIterator(Path filename, TypeDescription schema, RecordReader rows) {
- this.filename = filename;
+ VectorizedRowBatchIterator(String fileLocation, TypeDescription schema,
RecordReader rows) {
+ this.fileLocation = fileLocation;
this.rows = rows;
this.batch = schema.createRowBatch();
}
@@ -51,7 +53,7 @@ public class OrcIterator implements
Iterator<VectorizedRowBatch>, Closeable {
try {
rows.nextBatch(batch);
} catch (IOException e) {
- throw new RuntimeException("Problem reading ORC file " + filename, e);
+ throw new RuntimeException("Problem reading ORC file " + fileLocation,
e);
}
advanced = true;
}
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
new file mode 100644
index 0000000..301a550
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
@@ -0,0 +1,782 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.orc.ColumnIdMap;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.TypeConversion;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.storage.serde2.io.DateWritable;
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter;
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.Platform;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.List;
+
+/**
+ * Converts the OrcInterator, which returns ORC's VectorizedRowBatch to a
+ * set of Spark's UnsafeRows.
+ *
+ * It minimizes allocations by reusing most of the objects in the
implementation.
+ */
+public class SparkOrcReader implements OrcValueReader<InternalRow> {
+ private final static int INITIAL_SIZE = 128 * 1024;
+ private final int numFields;
+ private final TypeDescription readSchema;
+
+ public SparkOrcReader(Schema readSchema) {
+ this.readSchema = TypeConversion.toOrc(readSchema, new ColumnIdMap());
+ numFields = readSchema.columns().size();
+ }
+
+ private Converter[] buildConverters(final UnsafeRowWriter writer) {
+ final Converter[] converters = new Converter[numFields];
+ for(int c = 0; c < numFields; ++c) {
+ converters[c] = buildConverter(writer, readSchema.getChildren().get(c));
+ }
+ return converters;
+ }
+
+ @Override
+ public InternalRow read(VectorizedRowBatch batch, int row) {
+ final UnsafeRowWriter rowWriter = new UnsafeRowWriter(numFields,
INITIAL_SIZE);
+ final Converter[] converters = buildConverters(rowWriter);
+
+ rowWriter.reset();
+ rowWriter.zeroOutNullBytes();
+ for(int c=0; c < batch.cols.length; ++c) {
+ converters[c].convert(rowWriter, c, batch.cols[c], row);
+ }
+ return rowWriter.getRow();
+ }
+
+ private static String rowToString(SpecializedGetters row, TypeDescription
schema) {
+ final List<TypeDescription> children = schema.getChildren();
+ final StringBuilder rowBuilder = new StringBuilder("{");
+
+ for(int c = 0; c < children.size(); ++c) {
+ rowBuilder.append("\"");
+ rowBuilder.append(schema.getFieldNames().get(c));
+ rowBuilder.append("\": ");
+ rowBuilder.append(rowEntryToString(row, c, children.get(c)));
+ if (c != children.size() - 1) {
+ rowBuilder.append(", ");
+ }
+ }
+ rowBuilder.append("}");
+ return rowBuilder.toString();
+ }
+
+ private static String rowEntryToString(SpecializedGetters row, int ord,
TypeDescription schema) {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return Boolean.toString(row.getBoolean(ord));
+ case BYTE:
+ return Byte.toString(row.getByte(ord));
+ case SHORT:
+ return Short.toString(row.getShort(ord));
+ case INT:
+ return Integer.toString(row.getInt(ord));
+ case LONG:
+ return Long.toString(row.getLong(ord));
+ case FLOAT:
+ return Float.toString(row.getFloat(ord));
+ case DOUBLE:
+ return Double.toString(row.getDouble(ord));
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return "\"" + row.getUTF8String(ord) + "\"";
+ case BINARY: {
+ byte[] bin = row.getBinary(ord);
+ final StringBuilder binStr;
+ if (bin == null) {
+ binStr = new StringBuilder("null");
+ } else {
+ binStr = new StringBuilder("[");
+ for (int i = 0; i < bin.length; ++i) {
+ if (i != 0) {
+ binStr.append(", ");
+ }
+ int v = bin[i] & 0xff;
+ if (v < 16) {
+ binStr.append("0");
+ binStr.append(Integer.toHexString(v));
+ } else {
+ binStr.append(Integer.toHexString(v));
+ }
+ }
+ binStr.append("]");
+ }
+ return binStr.toString();
+ }
+ case DECIMAL:
+ return row.getDecimal(ord, schema.getPrecision(),
schema.getScale()).toString();
+ case DATE:
+ return "\"" + new DateWritable(row.getInt(ord)) + "\"";
+ case TIMESTAMP:
+ return "\"" + new Timestamp(row.getLong(ord)) + "\"";
+ case STRUCT:
+ return rowToString(row.getStruct(ord, schema.getChildren().size()),
schema);
+ case LIST: {
+ TypeDescription child = schema.getChildren().get(0);
+ final StringBuilder listStr = new StringBuilder("[");
+ ArrayData list = row.getArray(ord);
+ for(int e=0; e < list.numElements(); ++e) {
+ if (e != 0) {
+ listStr.append(", ");
+ }
+ listStr.append(rowEntryToString(list, e, child));
+ }
+ listStr.append("]");
+ return listStr.toString();
+ }
+ case MAP: {
+ TypeDescription keyType = schema.getChildren().get(0);
+ TypeDescription valueType = schema.getChildren().get(1);
+ MapData map = row.getMap(ord);
+ ArrayData keys = map.keyArray();
+ ArrayData values = map.valueArray();
+ StringBuilder mapStr = new StringBuilder("[");
+ for(int e=0; e < map.numElements(); ++e) {
+ if (e != 0) {
+ mapStr.append(", ");
+ }
+ mapStr.append(rowEntryToString(keys, e, keyType));
+ mapStr.append(": ");
+ mapStr.append(rowEntryToString(values, e, valueType));
+ }
+ mapStr.append("]");
+ return mapStr.toString();
+ }
+ default:
+ throw new IllegalArgumentException("Unhandled type " + schema);
+ }
+ }
+
+ private static int getArrayElementSize(TypeDescription type) {
+ switch (type.getCategory()) {
+ case BOOLEAN:
+ case BYTE:
+ return 1;
+ case SHORT:
+ return 2;
+ case INT:
+ case FLOAT:
+ return 4;
+ default:
+ return 8;
+ }
+ }
+
+ /**
+ * The common interface for converting from a ORC ColumnVector to a Spark
+ * UnsafeRow. UnsafeRows need two different interfaces for writers and thus
+ * we have two methods the first is for structs (UnsafeRowWriter) and the
+ * second is for lists and maps (UnsafeArrayWriter). If Spark adds a common
+ * interface similar to SpecializedGetters we could that and a single set of
+ * methods.
+ */
+ interface Converter {
+ void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int
row);
+ void convert(UnsafeArrayWriter writer, int element, ColumnVector vector,
int row);
+ }
+
+ private static class BooleanConverter implements Converter {
+ @Override
+ public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNullAt(column);
+ } else {
+ writer.write(column, ((LongColumnVector) vector).vector[row] != 0);
+ }
+ }
+
+ @Override
+ public void convert(UnsafeArrayWriter writer, int element,
+ ColumnVector vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNull(element);
+ } else {
+ writer.write(element, ((LongColumnVector) vector).vector[row] != 0);
+ }
+ }
+ }
+
+ private static class ByteConverter implements Converter {
+ @Override
+ public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
+ int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNullAt(column);
+ } else {
+ writer.write(column, (byte) ((LongColumnVector) vector).vector[row]);
+ }
+ }
+
+ @Override
+ public void convert(UnsafeArrayWriter writer, int element,
+ ColumnVector vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNull(element);
+ } else {
+ writer.write(element, (byte) ((LongColumnVector) vector).vector[row]);
+ }
+ }
+ }
+
+ private static class ShortConverter implements Converter {
+ @Override
+ public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
+ int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNullAt(column);
+ } else {
+ writer.write(column, (short) ((LongColumnVector) vector).vector[row]);
+ }
+ }
+
+ @Override
+ public void convert(UnsafeArrayWriter writer, int element,
+ ColumnVector vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNull(element);
+ } else {
+ writer.write(element, (short) ((LongColumnVector) vector).vector[row]);
+ }
+ }
+ }
+
+ private static class IntConverter implements Converter {
+ @Override
+ public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
+ int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNullAt(column);
+ } else {
+ writer.write(column, (int) ((LongColumnVector) vector).vector[row]);
+ }
+ }
+
+ @Override
+ public void convert(UnsafeArrayWriter writer, int element,
+ ColumnVector vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNull(element);
+ } else {
+ writer.write(element, (int) ((LongColumnVector) vector).vector[row]);
+ }
+ }
+ }
+
+ private static class LongConverter implements Converter {
+ @Override
+ public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
+ int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNullAt(column);
+ } else {
+ writer.write(column, ((LongColumnVector) vector).vector[row]);
+ }
+ }
+
+ @Override
+ public void convert(UnsafeArrayWriter writer, int element,
+ ColumnVector vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNull(element);
+ } else {
+ writer.write(element, ((LongColumnVector) vector).vector[row]);
+ }
+ }
+ }
+
+ private static class FloatConverter implements Converter {
+ @Override
+ public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
+ int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNullAt(column);
+ } else {
+ writer.write(column, (float) ((DoubleColumnVector)
vector).vector[row]);
+ }
+ }
+
+ @Override
+ public void convert(UnsafeArrayWriter writer, int element,
+ ColumnVector vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNull(element);
+ } else {
+ writer.write(element, (float) ((DoubleColumnVector)
vector).vector[row]);
+ }
+ }
+ }
+
+ private static class DoubleConverter implements Converter {
+ @Override
+ public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
+ int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNullAt(column);
+ } else {
+ writer.write(column, ((DoubleColumnVector) vector).vector[row]);
+ }
+ }
+
+ @Override
+ public void convert(UnsafeArrayWriter writer, int element,
+ ColumnVector vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNull(element);
+ } else {
+ writer.write(element, ((DoubleColumnVector) vector).vector[row]);
+ }
+ }
+ }
+
+ private static class TimestampConverter implements Converter {
+
+ private long convert(TimestampColumnVector vector, int row) {
+ // compute microseconds past 1970.
+ return (vector.time[row]/1000) * 1_000_000 + vector.nanos[row] / 1000;
+ }
+
+ @Override
+ public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
+ int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNullAt(column);
+ } else {
+ writer.write(column, convert((TimestampColumnVector) vector, row));
+ }
+ }
+
+ @Override
+ public void convert(UnsafeArrayWriter writer, int element,
+ ColumnVector vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNull(element);
+ } else {
+ writer.write(element, convert((TimestampColumnVector) vector, row));
+ }
+ }
+ }
+
+ private static class BinaryConverter implements Converter {
+
+ @Override
+ public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNullAt(column);
+ } else {
+ BytesColumnVector v = (BytesColumnVector) vector;
+ writer.write(column, v.vector[row], v.start[row], v.length[row]);
+ }
+ }
+
+ @Override
+ public void convert(UnsafeArrayWriter writer, int element, ColumnVector
vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNull(element);
+ } else {
+ final BytesColumnVector v = (BytesColumnVector) vector;
+ writer.write(element, v.vector[row], v.start[row], v.length[row]);
+ }
+ }
+ }
+
+ private static class Decimal18Converter implements Converter {
+ final int precision;
+ final int scale;
+
+ Decimal18Converter(int precision, int scale) {
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
+ int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNullAt(column);
+ } else {
+ HiveDecimalWritable v = ((DecimalColumnVector) vector).vector[row];
+ writer.write(column,
+ new Decimal().set(v.serialize64(v.scale()), v.precision(),
v.scale()),
+ precision, scale);
+ }
+ }
+
+ @Override
+ public void convert(UnsafeArrayWriter writer, int element,
+ ColumnVector vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNull(element);
+ } else {
+ HiveDecimalWritable v = ((DecimalColumnVector) vector).vector[row];
+ writer.write(element,
+ new Decimal().set(v.serialize64(v.scale()), v.precision(),
v.scale()),
+ precision, scale);
+ }
+ }
+ }
+
+ private static class Decimal38Converter implements Converter {
+ final int precision;
+ final int scale;
+
+ Decimal38Converter(int precision, int scale) {
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
+ int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNullAt(column);
+ } else {
+ BigDecimal v = ((DecimalColumnVector) vector).vector[row]
+ .getHiveDecimal().bigDecimalValue();
+ writer.write(column,
+ new Decimal().set(new scala.math.BigDecimal(v), precision, scale),
+ precision, scale);
+ }
+ }
+
+ @Override
+ public void convert(UnsafeArrayWriter writer, int element,
+ ColumnVector vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNull(element);
+ } else {
+ BigDecimal v = ((DecimalColumnVector) vector).vector[row]
+ .getHiveDecimal().bigDecimalValue();
+ writer.write(element,
+ new Decimal().set(new scala.math.BigDecimal(v), precision, scale),
+ precision, scale);
+ }
+ }
+ }
+
+ private static class StructConverter implements Converter {
+ private final Converter[] children;
+ private final UnsafeRowWriter childWriter;
+
+ StructConverter(final UnsafeWriter parentWriter, final TypeDescription
schema) {
+ children = new Converter[schema.getChildren().size()];
+ for(int c=0; c < children.length; ++c) {
+ children[c] = buildConverter(parentWriter,
schema.getChildren().get(c));
+ }
+ childWriter = new UnsafeRowWriter(parentWriter, children.length);
+ }
+
+ int writeStruct(StructColumnVector vector, int row) {
+ int start = childWriter.cursor();
+ childWriter.resetRowWriter();
+ for(int c=0; c < children.length; ++c) {
+ children[c].convert(childWriter, c, vector.fields[c], row);
+ }
+ return start;
+ }
+
+ @Override
+ public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNullAt(column);
+ } else {
+ int start = writeStruct((StructColumnVector) vector, row);
+ writer.setOffsetAndSizeFromPreviousCursor(column, start);
+ }
+ }
+
+ @Override
+ public void convert(UnsafeArrayWriter writer, int element,
+ ColumnVector vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNull(element);
+ } else {
+ int start = writeStruct((StructColumnVector) vector, row);
+ writer.setOffsetAndSizeFromPreviousCursor(element, start);
+ }
+ }
+ }
+
+ private static class ListConverter implements Converter {
+ private final Converter children;
+ private final UnsafeArrayWriter childWriter;
+
+ ListConverter(final UnsafeWriter parentWriter, TypeDescription schema) {
+ TypeDescription child = schema.getChildren().get(0);
+ children = buildConverter(parentWriter, child);
+ childWriter = new UnsafeArrayWriter(parentWriter,
getArrayElementSize(child));
+
+ }
+
+ int writeList(ListColumnVector v, int row) {
+ int offset = (int) v.offsets[row];
+ int length = (int) v.lengths[row];
+ int start = childWriter.cursor();
+ childWriter.initialize(length);
+ for(int c = 0; c < length; ++c) {
+ children.convert(childWriter, c, v.child, offset + c);
+ }
+ return start;
+ }
+
+ @Override
+ public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector,
+ int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNullAt(column);
+ } else {
+ int start = writeList((ListColumnVector) vector, row);
+ writer.setOffsetAndSizeFromPreviousCursor(column, start);
+ }
+ }
+
+ @Override
+ public void convert(UnsafeArrayWriter writer, int element,
+ ColumnVector vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNull(element);
+ } else {
+ int start = writeList((ListColumnVector) vector, row);
+ writer.setOffsetAndSizeFromPreviousCursor(element, start);
+ }
+ }
+ }
+
+ private static class MapConverter implements Converter {
+ private final Converter keyConvert;
+ private final Converter valueConvert;
+
+ private final UnsafeArrayWriter keyWriter;
+ private final UnsafeArrayWriter valueWriter;
+
+ private final int keySize;
+ private final int valueSize;
+
+ private final int KEY_SIZE_BYTES = 8;
+
+ MapConverter(final UnsafeWriter parentWriter, TypeDescription schema) {
+ final TypeDescription keyType = schema.getChildren().get(0);
+ final TypeDescription valueType = schema.getChildren().get(1);
+ keyConvert = buildConverter(parentWriter, keyType);
+ keySize = getArrayElementSize(keyType);
+ keyWriter = new UnsafeArrayWriter(parentWriter, keySize);
+ valueConvert = buildConverter(parentWriter, valueType);
+ valueSize = getArrayElementSize(valueType);
+ valueWriter = new UnsafeArrayWriter(parentWriter, valueSize);
+ }
+
+ int writeMap(MapColumnVector v, int row) {
+ final int offset = (int) v.offsets[row];
+ final int length = (int) v.lengths[row];
+ final int start = keyWriter.cursor();
+
+ // save room for the key size
+ keyWriter.grow(KEY_SIZE_BYTES);
+ keyWriter.increaseCursor(KEY_SIZE_BYTES);
+
+ // serialize the keys
+ keyWriter.initialize(length);
+ for(int c = 0; c < length; ++c) {
+ keyConvert.convert(keyWriter, c, v.keys, offset + c);
+ }
+ // store the serialized size of the keys
+ Platform.putLong(keyWriter.getBuffer(), start,
+ keyWriter.cursor() - start - KEY_SIZE_BYTES);
+
+ // serialize the values
+ valueWriter.initialize(length);
+ for(int c = 0; c < length; ++c) {
+ valueConvert.convert(valueWriter, c, v.values, offset + c);
+ }
+ return start;
+ }
+
+ @Override
+ public void convert(UnsafeRowWriter writer, int column, ColumnVector
vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNullAt(column);
+ } else {
+ int start = writeMap((MapColumnVector) vector, row);
+ writer.setOffsetAndSizeFromPreviousCursor(column, start);
+ }
+ }
+
+ @Override
+ public void convert(UnsafeArrayWriter writer, int element, ColumnVector
vector, int row) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (!vector.noNulls && vector.isNull[row]) {
+ writer.setNull(element);
+ } else {
+ int start = writeMap((MapColumnVector) vector, row);
+ writer.setOffsetAndSizeFromPreviousCursor(element, start);
+ }
+ }
+ }
+
+ static Converter buildConverter(final UnsafeWriter writer, final
TypeDescription schema) {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return new BooleanConverter();
+ case BYTE:
+ return new ByteConverter();
+ case SHORT:
+ return new ShortConverter();
+ case DATE:
+ case INT:
+ return new IntConverter();
+ case LONG:
+ return new LongConverter();
+ case FLOAT:
+ return new FloatConverter();
+ case DOUBLE:
+ return new DoubleConverter();
+ case TIMESTAMP:
+ return new TimestampConverter();
+ case DECIMAL:
+ if (schema.getPrecision() <= Decimal.MAX_LONG_DIGITS()) {
+ return new Decimal18Converter(schema.getPrecision(),
schema.getScale());
+ } else {
+ return new Decimal38Converter(schema.getPrecision(),
schema.getScale());
+ }
+ case BINARY:
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ return new BinaryConverter();
+ case STRUCT:
+ return new StructConverter(writer, schema);
+ case LIST:
+ return new ListConverter(writer, schema);
+ case MAP:
+ return new MapConverter(writer, schema);
+ default:
+ throw new IllegalArgumentException("Unhandled type " + schema);
+ }
+ }
+}
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
new file mode 100644
index 0000000..80e2878
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data;
+
+import org.apache.iceberg.orc.OrcValueWriter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+
+import java.util.List;
+
+/**
+ * This class acts as an adaptor from an OrcFileAppender to a
+ * FileAppender<InternalRow>.
+ */
+public class SparkOrcWriter implements OrcValueWriter<InternalRow> {
+
+ private final Converter[] converters;
+
+ public SparkOrcWriter(TypeDescription schema) {
+ converters = buildConverters(schema);
+ }
+
+ @Override
+ public void write(InternalRow value, VectorizedRowBatch output) {
+ int row = output.size++;
+ for(int c=0; c < converters.length; ++c) {
+ converters[c].addValue(row, c, value, output.cols[c]);
+ }
+ }
+
+ /**
+ * The interface for the conversion from Spark's SpecializedGetters to
+ * ORC's ColumnVectors.
+ */
+ interface Converter {
+ /**
+ * Take a value from the Spark data value and add it to the ORC output.
+ * @param rowId the row in the ColumnVector
+ * @param column either the column number or element number
+ * @param data either an InternalRow or ArrayData
+ * @param output the ColumnVector to put the value into
+ */
+ void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output);
+ }
+
+ static class BooleanConverter implements Converter {
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ ((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ?
1 : 0;
+ }
+ }
+ }
+
+ static class ByteConverter implements Converter {
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ ((LongColumnVector) output).vector[rowId] = data.getByte(column);
+ }
+ }
+ }
+
+ static class ShortConverter implements Converter {
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ ((LongColumnVector) output).vector[rowId] = data.getShort(column);
+ }
+ }
+ }
+
+ static class IntConverter implements Converter {
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ ((LongColumnVector) output).vector[rowId] = data.getInt(column);
+ }
+ }
+ }
+
+ static class LongConverter implements Converter {
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ ((LongColumnVector) output).vector[rowId] = data.getLong(column);
+ }
+ }
+ }
+
+ static class FloatConverter implements Converter {
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ ((DoubleColumnVector) output).vector[rowId] = data.getFloat(column);
+ }
+ }
+ }
+
+ static class DoubleConverter implements Converter {
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ ((DoubleColumnVector) output).vector[rowId] = data.getDouble(column);
+ }
+ }
+ }
+
+ static class StringConverter implements Converter {
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ byte[] value = data.getUTF8String(column).getBytes();
+ ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
+ }
+ }
+ }
+
+ static class BytesConverter implements Converter {
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ // getBinary always makes a copy, so we don't need to worry about it
+ // being changed behind our back.
+ byte[] value = data.getBinary(column);
+ ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
+ }
+ }
+ }
+
+ static class TimestampConverter implements Converter {
+
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ TimestampColumnVector cv = (TimestampColumnVector) output;
+ long micros = data.getLong(column);
+ cv.time[rowId] = (micros / 1_000_000) * 1000;
+ int nanos = (int) (micros % 1_000_000) * 1000;
+ if (nanos < 0) {
+ nanos += 1_000_000_000;
+ cv.time[rowId] -= 1000;
+ }
+ cv.nanos[rowId] = nanos;
+ }
+ }
+ }
+
+ static class Decimal18Converter implements Converter {
+ private final int precision;
+ private final int scale;
+
+ Decimal18Converter(TypeDescription schema) {
+ precision = schema.getPrecision();
+ scale = schema.getScale();
+ }
+
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(
+ data.getDecimal(column, precision, scale).toUnscaledLong(), scale);
+ }
+ }
+ }
+
+ static class Decimal38Converter implements Converter {
+ private final int precision;
+ private final int scale;
+
+ Decimal38Converter(TypeDescription schema) {
+ precision = schema.getPrecision();
+ scale = schema.getScale();
+ }
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ ((DecimalColumnVector) output).vector[rowId].set(
+ HiveDecimal.create(data.getDecimal(column, precision, scale)
+ .toJavaBigDecimal()));
+ }
+ }
+ }
+
+ static class StructConverter implements Converter {
+ private final Converter[] children;
+
+ StructConverter(TypeDescription schema) {
+ children = new Converter[schema.getChildren().size()];
+ for(int c=0; c < children.length; ++c) {
+ children[c] = buildConverter(schema.getChildren().get(c));
+ }
+ }
+
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ InternalRow value = data.getStruct(column, children.length);
+ StructColumnVector cv = (StructColumnVector) output;
+ for(int c=0; c < children.length; ++c) {
+ children[c].addValue(rowId, c, value, cv.fields[c]);
+ }
+ }
+ }
+ }
+
+ static class ListConverter implements Converter {
+ private final Converter children;
+
+ ListConverter(TypeDescription schema) {
+ children = buildConverter(schema.getChildren().get(0));
+ }
+
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ ArrayData value = data.getArray(column);
+ ListColumnVector cv = (ListColumnVector) output;
+ // record the length and start of the list elements
+ cv.lengths[rowId] = value.numElements();
+ cv.offsets[rowId] = cv.childCount;
+ cv.childCount += cv.lengths[rowId];
+ // make sure the child is big enough
+ cv.child.ensureSize(cv.childCount, true);
+ // Add each element
+ for(int e=0; e < cv.lengths[rowId]; ++e) {
+ children.addValue((int) (e + cv.offsets[rowId]), e, value, cv.child);
+ }
+ }
+ }
+ }
+
+ static class MapConverter implements Converter {
+ private final Converter keyConverter;
+ private final Converter valueConverter;
+
+ MapConverter(TypeDescription schema) {
+ keyConverter = buildConverter(schema.getChildren().get(0));
+ valueConverter = buildConverter(schema.getChildren().get(1));
+ }
+
+ public void addValue(int rowId, int column, SpecializedGetters data,
+ ColumnVector output) {
+ if (data.isNullAt(column)) {
+ output.noNulls = false;
+ output.isNull[rowId] = true;
+ } else {
+ output.isNull[rowId] = false;
+ MapData map = data.getMap(column);
+ ArrayData key = map.keyArray();
+ ArrayData value = map.valueArray();
+ MapColumnVector cv = (MapColumnVector) output;
+ // record the length and start of the list elements
+ cv.lengths[rowId] = value.numElements();
+ cv.offsets[rowId] = cv.childCount;
+ cv.childCount += cv.lengths[rowId];
+ // make sure the child is big enough
+ cv.keys.ensureSize(cv.childCount, true);
+ cv.values.ensureSize(cv.childCount, true);
+ // Add each element
+ for(int e=0; e < cv.lengths[rowId]; ++e) {
+ int pos = (int)(e + cv.offsets[rowId]);
+ keyConverter.addValue(pos, e, key, cv.keys);
+ valueConverter.addValue(pos, e, value, cv.values);
+ }
+ }
+ }
+ }
+
+ private static Converter buildConverter(TypeDescription schema) {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return new BooleanConverter();
+ case BYTE:
+ return new ByteConverter();
+ case SHORT:
+ return new ShortConverter();
+ case DATE:
+ case INT:
+ return new IntConverter();
+ case LONG:
+ return new LongConverter();
+ case FLOAT:
+ return new FloatConverter();
+ case DOUBLE:
+ return new DoubleConverter();
+ case BINARY:
+ return new BytesConverter();
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ return new StringConverter();
+ case DECIMAL:
+ return schema.getPrecision() <= 18
+ ? new Decimal18Converter(schema)
+ : new Decimal38Converter(schema);
+ case TIMESTAMP:
+ return new TimestampConverter();
+ case STRUCT:
+ return new StructConverter(schema);
+ case LIST:
+ return new ListConverter(schema);
+ case MAP:
+ return new MapConverter(schema);
+ }
+ throw new IllegalArgumentException("Unhandled type " + schema);
+ }
+
+ private static Converter[] buildConverters(TypeDescription schema) {
+ if (schema.getCategory() != TypeDescription.Category.STRUCT) {
+ throw new IllegalArgumentException("Top level must be a struct " +
schema);
+ }
+ List<TypeDescription> children = schema.getChildren();
+ Converter[] result = new Converter[children.size()];
+ for(int c=0; c < children.size(); ++c) {
+ result[c] = buildConverter(children.get(c));
+ }
+ return result;
+ }
+
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index a74d9cd..63a33f9 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -51,6 +51,7 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkSchemaUtil;
@@ -59,6 +60,7 @@ import org.apache.iceberg.spark.data.SparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.spark.data.SparkOrcReader;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
@@ -433,6 +435,10 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
iter = newAvroIterable(location, task, readSchema);
break;
+ case ORC:
+ iter = newOrcIterable(location, task, readSchema);
+ break;
+
default:
throw new UnsupportedOperationException(
"Cannot read unknown format: " + task.file().format());
@@ -465,6 +471,17 @@ class Reader implements DataSourceReader,
SupportsPushDownFilters, SupportsPushD
.caseSensitive(caseSensitive)
.build();
}
+
+ private CloseableIterable<InternalRow> newOrcIterable(InputFile location,
+ FileScanTask task,
+ Schema readSchema) {
+ return ORC.read(location)
+ .schema(readSchema)
+ .split(task.start(), task.length())
+ .createReaderFunc(SparkOrcReader::new)
+ .caseSensitive(caseSensitive)
+ .build();
+ }
}
private static class PartitionRowConverter implements Function<StructLike,
InternalRow> {
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index a760455..69d0d84 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -52,6 +52,7 @@ import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.unsafe.types.UTF8String;
import org.junit.Assert;
import scala.collection.Seq;
@@ -594,6 +595,8 @@ public class TestHelpers {
actual instanceof MapData);
assertEquals(context, (MapType) type, (MapData) expected, (MapData)
actual);
+ } else if (type instanceof BinaryType) {
+ assertEqualBytes(context, (byte[]) expected, (byte[]) actual);
} else {
Assert.assertEquals("Value should match expected: " + context, expected,
actual);
}
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java
b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java
new file mode 100644
index 0000000..1a20ff8
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data;
+
+import static org.apache.iceberg.spark.data.TestHelpers.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.Assert;
+
+public class TestSparkOrcReader extends AvroDataTest {
+ @Override
+ protected void writeAndValidate(Schema schema) throws IOException {
+ final Iterable<InternalRow> expected = RandomData
+ .generateSpark(schema, 100, 0L);
+
+ final File testFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", testFile.delete());
+
+ try (FileAppender<InternalRow> writer =
ORC.write(Files.localOutput(testFile))
+ .createWriterFunc(SparkOrcWriter::new)
+ .schema(schema)
+ .build()) {
+ writer.addAll(expected);
+ }
+
+ try (CloseableIterable<InternalRow> reader =
ORC.read(Files.localInput(testFile))
+ .schema(schema)
+ .createReaderFunc(SparkOrcReader::new)
+ .build()) {
+ final Iterator<InternalRow> actualRows = reader.iterator();
+ final Iterator<InternalRow> expectedRows = expected.iterator();
+ while (expectedRows.hasNext()) {
+ Assert.assertTrue("Should have expected number of rows",
actualRows.hasNext());
+ assertEquals(schema, expectedRows.next(), actualRows.next());
+ }
+ Assert.assertFalse("Should not have extra rows", actualRows.hasNext());
+ }
+ }
+}