This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 75707d0c68 Orc: Obtain ORC stripe offsets from writer (#5778)
75707d0c68 is described below
commit 75707d0c686f9ac2f3d8be826b6c32402173bf3d
Author: Pavan Lanka <[email protected]>
AuthorDate: Wed Sep 28 13:11:26 2022 -0700
Orc: Obtain ORC stripe offsets from writer (#5778)
Closes #5777
---
build.gradle | 1 +
.../org/apache/iceberg/orc/OrcFileAppender.java | 8 +-
.../org/apache/iceberg/orc/TestOrcDataWriter.java | 129 +++++++++++++++++++++
3 files changed, 134 insertions(+), 4 deletions(-)
diff --git a/build.gradle b/build.gradle
index ba8b07fef0..78bcea8880 100644
--- a/build.gradle
+++ b/build.gradle
@@ -542,6 +542,7 @@ project(':iceberg-orc') {
}
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
+ testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
testImplementation project(':iceberg-common')
}
}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
index f407bdcf43..a2c0d2ccea 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
@@ -37,7 +37,6 @@ import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
import org.apache.orc.StripeInformation;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
@@ -147,11 +146,12 @@ class OrcFileAppender<D> implements FileAppender<D> {
@Override
public List<Long> splitOffsets() {
Preconditions.checkState(isClosed, "File is not yet closed");
- try (Reader reader = ORC.newFileReader(file.toInputFile(), conf)) {
- List<StripeInformation> stripes = reader.getStripes();
+ try {
+ List<StripeInformation> stripes = writer.getStripes();
return Collections.unmodifiableList(Lists.transform(stripes,
StripeInformation::getOffset));
} catch (IOException e) {
- throw new RuntimeIOException(e, "Can't close ORC reader %s",
file.location());
+ throw new RuntimeIOException(
+ e, "Failed to get stripe information from writer for: %s",
file.location());
}
}
diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestOrcDataWriter.java
b/orc/src/test/java/org/apache/iceberg/orc/TestOrcDataWriter.java
new file mode 100644
index 0000000000..5da7fa7b09
--- /dev/null
+++ b/orc/src/test/java/org/apache/iceberg/orc/TestOrcDataWriter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.orc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.OrcFile;
+import org.apache.orc.StripeInformation;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestOrcDataWriter {
+ private static final Schema SCHEMA =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(3, "binary", Types.BinaryType.get()));
+
+ private List<Record> records;
+
+ @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+ @Before
+ public void createRecords() {
+ GenericRecord record = GenericRecord.create(SCHEMA);
+
+ ImmutableList.Builder<Record> builder = ImmutableList.builder();
+ builder.add(record.copy(ImmutableMap.of("id", 1L, "data", "a")));
+ builder.add(record.copy(ImmutableMap.of("id", 2L, "data", "b")));
+ builder.add(record.copy(ImmutableMap.of("id", 3L, "data", "c")));
+ builder.add(record.copy(ImmutableMap.of("id", 4L, "data", "d")));
+ builder.add(record.copy(ImmutableMap.of("id", 5L, "data", "e")));
+
+ this.records = builder.build();
+ }
+
+ private List<Long> stripeOffsetsFromReader(DataFile dataFile) throws
IOException {
+ return OrcFile.createReader(
+ new Path(dataFile.path().toString()), OrcFile.readerOptions(new
Configuration()))
+ .getStripes().stream()
+ .map(StripeInformation::getOffset)
+ .collect(Collectors.toList());
+ }
+
+ @Test
+ public void testDataWriter() throws IOException {
+ OutputFile file = Files.localOutput(temp.newFile());
+
+ SortOrder sortOrder =
SortOrder.builderFor(SCHEMA).withOrderId(10).asc("id").build();
+
+ DataWriter<Record> dataWriter =
+ ORC.writeData(file)
+ .schema(SCHEMA)
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .overwrite()
+ .withSpec(PartitionSpec.unpartitioned())
+ .withSortOrder(sortOrder)
+ .build();
+
+ try {
+ for (Record record : records) {
+ dataWriter.write(record);
+ }
+ } finally {
+ dataWriter.close();
+ }
+
+ DataFile dataFile = dataWriter.toDataFile();
+ Assert.assertEquals(dataFile.splitOffsets(),
stripeOffsetsFromReader(dataFile));
+ Assert.assertEquals("Format should be ORC", FileFormat.ORC,
dataFile.format());
+ Assert.assertEquals("Should be data file", FileContent.DATA,
dataFile.content());
+ Assert.assertEquals("Record count should match", records.size(),
dataFile.recordCount());
+ Assert.assertEquals("Partition should be empty", 0,
dataFile.partition().size());
+ Assert.assertEquals(
+ "Sort order should match", sortOrder.orderId(), (int)
dataFile.sortOrderId());
+ Assert.assertNull("Key metadata should be null", dataFile.keyMetadata());
+
+ List<Record> writtenRecords;
+ try (CloseableIterable<Record> reader =
+ ORC.read(file.toInputFile())
+ .project(SCHEMA)
+ .createReaderFunc(fileSchema ->
GenericOrcReader.buildReader(SCHEMA, fileSchema))
+ .build()) {
+ writtenRecords = Lists.newArrayList(reader);
+ }
+
+ Assert.assertEquals("Written records should match", records,
writtenRecords);
+ }
+}