This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 75707d0c68 Orc: Obtain ORC stripe offsets from writer (#5778)
75707d0c68 is described below

commit 75707d0c686f9ac2f3d8be826b6c32402173bf3d
Author: Pavan Lanka <[email protected]>
AuthorDate: Wed Sep 28 13:11:26 2022 -0700

    Orc: Obtain ORC stripe offsets from writer (#5778)
    
    Closes #5777
---
 build.gradle                                       |   1 +
 .../org/apache/iceberg/orc/OrcFileAppender.java    |   8 +-
 .../org/apache/iceberg/orc/TestOrcDataWriter.java  | 129 +++++++++++++++++++++
 3 files changed, 134 insertions(+), 4 deletions(-)

diff --git a/build.gradle b/build.gradle
index ba8b07fef0..78bcea8880 100644
--- a/build.gradle
+++ b/build.gradle
@@ -542,6 +542,7 @@ project(':iceberg-orc') {
     }
 
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
+    testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
     testImplementation project(':iceberg-common')
   }
 }
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java 
b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
index f407bdcf43..a2c0d2ccea 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
@@ -37,7 +37,6 @@ import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
@@ -147,11 +146,12 @@ class OrcFileAppender<D> implements FileAppender<D> {
   @Override
   public List<Long> splitOffsets() {
     Preconditions.checkState(isClosed, "File is not yet closed");
-    try (Reader reader = ORC.newFileReader(file.toInputFile(), conf)) {
-      List<StripeInformation> stripes = reader.getStripes();
+    try {
+      List<StripeInformation> stripes = writer.getStripes();
       return Collections.unmodifiableList(Lists.transform(stripes, 
StripeInformation::getOffset));
     } catch (IOException e) {
-      throw new RuntimeIOException(e, "Can't close ORC reader %s", 
file.location());
+      throw new RuntimeIOException(
+          e, "Failed to get stripe information from writer for: %s", 
file.location());
     }
   }
 
diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestOrcDataWriter.java 
b/orc/src/test/java/org/apache/iceberg/orc/TestOrcDataWriter.java
new file mode 100644
index 0000000000..5da7fa7b09
--- /dev/null
+++ b/orc/src/test/java/org/apache/iceberg/orc/TestOrcDataWriter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.orc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.OrcFile;
+import org.apache.orc.StripeInformation;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestOrcDataWriter {
+  private static final Schema SCHEMA =
+      new Schema(
+          Types.NestedField.required(1, "id", Types.LongType.get()),
+          Types.NestedField.optional(2, "data", Types.StringType.get()),
+          Types.NestedField.optional(3, "binary", Types.BinaryType.get()));
+
+  private List<Record> records;
+
+  @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void createRecords() {
+    GenericRecord record = GenericRecord.create(SCHEMA);
+
+    ImmutableList.Builder<Record> builder = ImmutableList.builder();
+    builder.add(record.copy(ImmutableMap.of("id", 1L, "data", "a")));
+    builder.add(record.copy(ImmutableMap.of("id", 2L, "data", "b")));
+    builder.add(record.copy(ImmutableMap.of("id", 3L, "data", "c")));
+    builder.add(record.copy(ImmutableMap.of("id", 4L, "data", "d")));
+    builder.add(record.copy(ImmutableMap.of("id", 5L, "data", "e")));
+
+    this.records = builder.build();
+  }
+
+  private List<Long> stripeOffsetsFromReader(DataFile dataFile) throws 
IOException {
+    return OrcFile.createReader(
+            new Path(dataFile.path().toString()), OrcFile.readerOptions(new 
Configuration()))
+        .getStripes().stream()
+        .map(StripeInformation::getOffset)
+        .collect(Collectors.toList());
+  }
+
+  @Test
+  public void testDataWriter() throws IOException {
+    OutputFile file = Files.localOutput(temp.newFile());
+
+    SortOrder sortOrder = 
SortOrder.builderFor(SCHEMA).withOrderId(10).asc("id").build();
+
+    DataWriter<Record> dataWriter =
+        ORC.writeData(file)
+            .schema(SCHEMA)
+            .createWriterFunc(GenericOrcWriter::buildWriter)
+            .overwrite()
+            .withSpec(PartitionSpec.unpartitioned())
+            .withSortOrder(sortOrder)
+            .build();
+
+    try {
+      for (Record record : records) {
+        dataWriter.write(record);
+      }
+    } finally {
+      dataWriter.close();
+    }
+
+    DataFile dataFile = dataWriter.toDataFile();
+    Assert.assertEquals(dataFile.splitOffsets(), 
stripeOffsetsFromReader(dataFile));
+    Assert.assertEquals("Format should be ORC", FileFormat.ORC, 
dataFile.format());
+    Assert.assertEquals("Should be data file", FileContent.DATA, 
dataFile.content());
+    Assert.assertEquals("Record count should match", records.size(), 
dataFile.recordCount());
+    Assert.assertEquals("Partition should be empty", 0, 
dataFile.partition().size());
+    Assert.assertEquals(
+        "Sort order should match", sortOrder.orderId(), (int) 
dataFile.sortOrderId());
+    Assert.assertNull("Key metadata should be null", dataFile.keyMetadata());
+
+    List<Record> writtenRecords;
+    try (CloseableIterable<Record> reader =
+        ORC.read(file.toInputFile())
+            .project(SCHEMA)
+            .createReaderFunc(fileSchema -> 
GenericOrcReader.buildReader(SCHEMA, fileSchema))
+            .build()) {
+      writtenRecords = Lists.newArrayList(reader);
+    }
+
+    Assert.assertEquals("Written records should match", records, 
writtenRecords);
+  }
+}

Reply via email to