This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 2dfc455  Spark: support ORC writes (#857)
2dfc455 is described below

commit 2dfc4551182f5b210755792b6624a3f5a20263e3
Author: dingxiaokun <[email protected]>
AuthorDate: Sat Apr 4 08:02:59 2020 +0800

    Spark: support ORC writes (#857)
    
    Fixes #855.
---
 .../org/apache/iceberg/spark/source/Writer.java    | 14 +++-
 ...stParquetWrite.java => TestSparkDataWrite.java} | 88 ++++++++++++++++------
 2 files changed, 77 insertions(+), 25 deletions(-)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index 844c681..88f8a24 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -52,8 +52,10 @@ import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.spark.data.SparkAvroWriter;
+import org.apache.iceberg.spark.data.SparkOrcWriter;
 import org.apache.iceberg.spark.data.SparkParquetWriters;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
@@ -309,6 +311,14 @@ class Writer implements DataSourceWriter {
                   .overwrite()
                   .build();
 
+            case ORC:
+              return ORC.write(file)
+                  .createWriterFunc(SparkOrcWriter::new)
+                  .setAll(properties)
+                  .schema(dsSchema)
+                  .overwrite()
+                  .build();
+
             default:
               throw new UnsupportedOperationException("Cannot write unknown 
format: " + fileFormat);
           }
@@ -389,7 +399,9 @@ class Writer implements DataSourceWriter {
     public abstract void write(InternalRow row) throws IOException;
 
     public void writeInternal(InternalRow row)  throws IOException {
-      if (currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= 
targetFileSize) {
+      //TODO: ORC file now not support target file size before closed
+      if  (!format.equals(FileFormat.ORC) &&
+          currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= 
targetFileSize) {
         closeCurrent();
         openCurrent();
       }
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
similarity index 81%
rename from 
spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
rename to 
spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 7fa335a..f6bcb66 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
+++ 
b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -23,8 +23,10 @@ import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.Locale;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestReader;
 import org.apache.iceberg.PartitionSpec;
@@ -43,11 +45,16 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 
-public class TestParquetWrite {
+@RunWith(Parameterized.class)
+public class TestSparkDataWrite {
   private static final Configuration CONF = new Configuration();
+  private final FileFormat format;
+  private static SparkSession spark = null;
   private static final Schema SCHEMA = new Schema(
       optional(1, "id", Types.IntegerType.get()),
       optional(2, "data", Types.StringType.get())
@@ -56,23 +63,34 @@ public class TestParquetWrite {
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
 
-  private static SparkSession spark = null;
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][] {
+        new Object[] { "parquet" },
+        new Object[] { "avro" },
+        new Object[] { "orc" }
+    };
+  }
 
   @BeforeClass
   public static void startSpark() {
-    TestParquetWrite.spark = 
SparkSession.builder().master("local[2]").getOrCreate();
+    TestSparkDataWrite.spark = 
SparkSession.builder().master("local[2]").getOrCreate();
   }
 
   @AfterClass
   public static void stopSpark() {
-    SparkSession currentSpark = TestParquetWrite.spark;
-    TestParquetWrite.spark = null;
+    SparkSession currentSpark = TestSparkDataWrite.spark;
+    TestSparkDataWrite.spark = null;
     currentSpark.stop();
   }
 
+  public TestSparkDataWrite(String format) {
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+  }
+
   @Test
   public void testBasicWrite() throws IOException {
-    File parent = temp.newFolder("parquet");
+    File parent = temp.newFolder(format.toString());
     File location = new File(parent, "test");
 
     HadoopTables tables = new HadoopTables(CONF);
@@ -86,10 +104,10 @@ public class TestParquetWrite {
     );
 
     Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
-
     // TODO: incoming columns must be ordered according to the table's schema
     df.select("id", "data").write()
         .format("iceberg")
+        .option("write-format", format.toString())
         .mode("append")
         .save(location.toString());
 
@@ -104,20 +122,26 @@ public class TestParquetWrite {
     Assert.assertEquals("Result rows should match", expected, actual);
     for (ManifestFile manifest : table.currentSnapshot().manifests()) {
       for (DataFile file : ManifestReader.read(manifest, table.io())) {
-        Assert.assertNotNull("Split offsets not present", file.splitOffsets());
+        // TODO: avro not support split
+        if (!format.equals(FileFormat.AVRO)) {
+          Assert.assertNotNull("Split offsets not present", 
file.splitOffsets());
+        }
         Assert.assertEquals("Should have reported record count as 1", 1, 
file.recordCount());
-        Assert.assertNotNull("Column sizes metric not present", 
file.columnSizes());
-        Assert.assertNotNull("Counts metric not present", file.valueCounts());
-        Assert.assertNotNull("Null value counts metric not present", 
file.nullValueCounts());
-        Assert.assertNotNull("Lower bounds metric not present", 
file.lowerBounds());
-        Assert.assertNotNull("Upper bounds metric not present", 
file.upperBounds());
+        //TODO: append more metric info
+        if (format.equals(FileFormat.PARQUET)) {
+          Assert.assertNotNull("Column sizes metric not present", 
file.columnSizes());
+          Assert.assertNotNull("Counts metric not present", 
file.valueCounts());
+          Assert.assertNotNull("Null value counts metric not present", 
file.nullValueCounts());
+          Assert.assertNotNull("Lower bounds metric not present", 
file.lowerBounds());
+          Assert.assertNotNull("Upper bounds metric not present", 
file.upperBounds());
+        }
       }
     }
   }
 
   @Test
   public void testAppend() throws IOException {
-    File parent = temp.newFolder("parquet");
+    File parent = temp.newFolder(format.toString());
     File location = new File(parent, "test");
 
     HadoopTables tables = new HadoopTables(CONF);
@@ -143,11 +167,13 @@ public class TestParquetWrite {
 
     df.select("id", "data").write()
         .format("iceberg")
+        .option("write-format", format.toString())
         .mode("append")
         .save(location.toString());
 
     df.withColumn("id", df.col("id").plus(3)).select("id", "data").write()
         .format("iceberg")
+        .option("write-format", format.toString())
         .mode("append")
         .save(location.toString());
 
@@ -164,7 +190,7 @@ public class TestParquetWrite {
 
   @Test
   public void testOverwrite() throws IOException {
-    File parent = temp.newFolder("parquet");
+    File parent = temp.newFolder(format.toString());
     File location = new File(parent, "test");
 
     HadoopTables tables = new HadoopTables(CONF);
@@ -189,12 +215,14 @@ public class TestParquetWrite {
 
     df.select("id", "data").write()
         .format("iceberg")
+        .option("write-format", format.toString())
         .mode("append")
         .save(location.toString());
 
     // overwrite with 2*id to replace record 2, append 4 and 6
     df.withColumn("id", df.col("id").multiply(2)).select("id", "data").write()
         .format("iceberg")
+        .option("write-format", format.toString())
         .mode("overwrite")
         .save(location.toString());
 
@@ -211,7 +239,7 @@ public class TestParquetWrite {
 
   @Test
   public void testUnpartitionedOverwrite() throws IOException {
-    File parent = temp.newFolder("parquet");
+    File parent = temp.newFolder(format.toString());
     File location = new File(parent, "test");
 
     HadoopTables tables = new HadoopTables(CONF);
@@ -228,12 +256,14 @@ public class TestParquetWrite {
 
     df.select("id", "data").write()
         .format("iceberg")
+        .option("write-format", format.toString())
         .mode("append")
         .save(location.toString());
 
     // overwrite with the same data; should not produce two copies
     df.select("id", "data").write()
         .format("iceberg")
+        .option("write-format", format.toString())
         .mode("overwrite")
         .save(location.toString());
 
@@ -250,7 +280,7 @@ public class TestParquetWrite {
 
   @Test
   public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() 
throws IOException {
-    File parent = temp.newFolder("parquet");
+    File parent = temp.newFolder(format.toString());
     File location = new File(parent, "test");
 
     HadoopTables tables = new HadoopTables(CONF);
@@ -270,6 +300,7 @@ public class TestParquetWrite {
 
     df.select("id", "data").write()
         .format("iceberg")
+        .option("write-format", format.toString())
         .mode("append")
         .save(location.toString());
 
@@ -289,13 +320,16 @@ public class TestParquetWrite {
         files.add(file);
       }
     }
-    Assert.assertEquals("Should have 4 DataFiles", 4, files.size());
-    Assert.assertTrue("All DataFiles contain 1000 rows", 
files.stream().allMatch(d -> d.recordCount() == 1000));
+    // TODO: ORC file now not support target file size
+    if (!format.equals(FileFormat.ORC)) {
+      Assert.assertEquals("Should have 4 DataFiles", 4, files.size());
+      Assert.assertTrue("All DataFiles contain 1000 rows", 
files.stream().allMatch(d -> d.recordCount() == 1000));
+    }
   }
 
   @Test
   public void testPartitionedCreateWithTargetFileSizeViaOption() throws 
IOException {
-    File parent = temp.newFolder("parquet");
+    File parent = temp.newFolder(format.toString());
     File location = new File(parent, "test");
 
     HadoopTables tables = new HadoopTables(CONF);
@@ -314,6 +348,7 @@ public class TestParquetWrite {
 
     df.select("id", "data").sort("data").write()
         .format("iceberg")
+        .option("write-format", format.toString())
         .mode("append")
         .option("target-file-size-bytes", 4) // ~4 bytes; low enough to trigger
         .save(location.toString());
@@ -334,13 +369,16 @@ public class TestParquetWrite {
         files.add(file);
       }
     }
-    Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
-    Assert.assertTrue("All DataFiles contain 1000 rows", 
files.stream().allMatch(d -> d.recordCount() == 1000));
+    // TODO: ORC file now not support target file size
+    if (!format.equals(FileFormat.ORC)) {
+      Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
+      Assert.assertTrue("All DataFiles contain 1000 rows", 
files.stream().allMatch(d -> d.recordCount() == 1000));
+    }
   }
 
   @Test
   public void testWriteProjection() throws IOException {
-    File parent = temp.newFolder("parquet");
+    File parent = temp.newFolder(format.toString());
     File location = new File(parent, "test");
 
     HadoopTables tables = new HadoopTables(CONF);
@@ -357,6 +395,7 @@ public class TestParquetWrite {
 
     df.select("id").write() // select only id column
         .format("iceberg")
+        .option("write-format", format.toString())
         .mode("append")
         .save(location.toString());
 
@@ -373,7 +412,7 @@ public class TestParquetWrite {
 
   @Test
   public void testWriteProjectionWithMiddle() throws IOException {
-    File parent = temp.newFolder("parquet");
+    File parent = temp.newFolder(format.toString());
     File location = new File(parent, "test");
 
     HadoopTables tables = new HadoopTables(CONF);
@@ -395,6 +434,7 @@ public class TestParquetWrite {
 
     df.select("c1", "c3").write()
         .format("iceberg")
+        .option("write-format", format.toString())
         .mode("append")
         .save(location.toString());
 

Reply via email to