This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 65319e9  Core: Support multiple specs in OutputFileFactory (#2858)
65319e9 is described below

commit 65319e911235a8cfed6f616955f1f92192d8a52f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Jul 26 11:57:38 2021 -1000

    Core: Support multiple specs in OutputFileFactory (#2858)
---
 .../org/apache/iceberg/io/OutputFileFactory.java   | 72 +++++++++++++++++--
 .../apache/iceberg/io/TestOutputFileFactory.java   | 81 ++++++++++++++++++++++
 .../org/apache/iceberg/io/TestAppenderFactory.java |  3 +-
 .../org/apache/iceberg/io/TestBaseTaskWriter.java  |  3 +-
 .../io/TestGenericSortedPosDeleteWriter.java       |  3 +-
 .../iceberg/io/TestTaskEqualityDeltaWriter.java    |  3 +-
 .../iceberg/mr/hive/HiveIcebergOutputFormat.java   | 14 ++--
 .../mr/hive/TestHiveIcebergOutputCommitter.java    | 16 ++---
 .../iceberg/spark/source/RowDataRewriter.java      | 10 ++-
 .../org/apache/iceberg/spark/source/Writer.java    |  2 +-
 .../apache/iceberg/spark/source/SparkWrite.java    |  2 +-
 11 files changed, 176 insertions(+), 33 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java 
b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
index c510890..2607c4c 100644
--- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
+++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg.io;
 
+import java.util.Locale;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.FileFormat;
@@ -28,11 +29,14 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.encryption.EncryptionManager;
 
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+
 /**
  * Factory responsible for generating unique but recognizable data file names.
  */
 public class OutputFileFactory {
-  private final PartitionSpec spec;
+  private final PartitionSpec defaultSpec;
   private final FileFormat format;
   private final LocationProvider locations;
   private final FileIO io;
@@ -45,12 +49,13 @@ public class OutputFileFactory {
   private final String operationId;
   private final AtomicInteger fileCount = new AtomicInteger(0);
 
-  // TODO: expose a builder like OutputFileFactory.forTable()
+  @Deprecated
   public OutputFileFactory(Table table, FileFormat format, int partitionId, 
long taskId) {
     this(table.spec(), format, table.locationProvider(), table.io(), 
table.encryption(),
         partitionId, taskId, UUID.randomUUID().toString());
   }
 
+  @Deprecated
   public OutputFileFactory(Table table, PartitionSpec spec, FileFormat format, 
int partitionId, long taskId) {
     this(spec, format, table.locationProvider(), table.io(), 
table.encryption(),
         partitionId, taskId, UUID.randomUUID().toString());
@@ -65,7 +70,9 @@ public class OutputFileFactory {
    * @param encryptionManager Encryption manager used for encrypting the files
    * @param partitionId First part of the file name
    * @param taskId Second part of the file name
+   * @deprecated since 0.12.0, will be removed in 0.13.0; use {@link 
#builderFor(Table, int, long)} instead.
    */
+  @Deprecated
   public OutputFileFactory(PartitionSpec spec, FileFormat format, 
LocationProvider locations, FileIO io,
                            EncryptionManager encryptionManager, int 
partitionId, long taskId) {
     this(spec, format, locations, io, encryptionManager, partitionId, taskId, 
UUID.randomUUID().toString());
@@ -82,10 +89,12 @@ public class OutputFileFactory {
    * @param partitionId First part of the file name
    * @param taskId Second part of the file name
    * @param operationId Third part of the file name
+   * @deprecated since 0.12.0, will be removed in 0.13.0; use {@link 
#builderFor(Table, int, long)} instead.
    */
+  @Deprecated
   public OutputFileFactory(PartitionSpec spec, FileFormat format, 
LocationProvider locations, FileIO io,
                            EncryptionManager encryptionManager, int 
partitionId, long taskId, String operationId) {
-    this.spec = spec;
+    this.defaultSpec = spec;
     this.format = format;
     this.locations = locations;
     this.io = io;
@@ -95,13 +104,17 @@ public class OutputFileFactory {
     this.operationId = operationId;
   }
 
+  public static Builder builderFor(Table table, int partitionId, long taskId) {
+    return new Builder(table, partitionId, taskId);
+  }
+
   private String generateFilename() {
     return format.addExtension(
         String.format("%05d-%d-%s-%05d", partitionId, taskId, operationId, 
fileCount.incrementAndGet()));
   }
 
   /**
-   * Generates EncryptedOutputFile for UnpartitionedWriter.
+   * Generates an {@link EncryptedOutputFile} for unpartitioned writes.
    */
   public EncryptedOutputFile newOutputFile() {
     OutputFile file = 
io.newOutputFile(locations.newDataLocation(generateFilename()));
@@ -109,11 +122,60 @@ public class OutputFileFactory {
   }
 
   /**
-   * Generates EncryptedOutputFile for PartitionedWriter.
+   * Generates an {@link EncryptedOutputFile} for partitioned writes in the 
default spec.
    */
   public EncryptedOutputFile newOutputFile(StructLike partition) {
+    return newOutputFile(defaultSpec, partition);
+  }
+
+  /**
+   * Generates an {@link EncryptedOutputFile} for partitioned writes in a 
given spec.
+   */
+  public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike 
partition) {
     String newDataLocation = locations.newDataLocation(spec, partition, 
generateFilename());
     OutputFile rawOutputFile = io.newOutputFile(newDataLocation);
     return encryptionManager.encrypt(rawOutputFile);
   }
+
+  public static class Builder {
+    private final Table table;
+    private final int partitionId;
+    private final long taskId;
+    private PartitionSpec defaultSpec;
+    private String operationId;
+    private FileFormat format;
+
+    private Builder(Table table, int partitionId, long taskId) {
+      this.table = table;
+      this.partitionId = partitionId;
+      this.taskId = taskId;
+      this.defaultSpec = table.spec();
+      this.operationId = UUID.randomUUID().toString();
+
+      String formatAsString = 
table.properties().getOrDefault(DEFAULT_FILE_FORMAT, 
DEFAULT_FILE_FORMAT_DEFAULT);
+      this.format = 
FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT));
+    }
+
+    public Builder defaultSpec(PartitionSpec newDefaultSpec) {
+      this.defaultSpec = newDefaultSpec;
+      return this;
+    }
+
+    public Builder operationId(String newOperationId) {
+      this.operationId = newOperationId;
+      return this;
+    }
+
+    public Builder format(FileFormat newFormat) {
+      this.format = newFormat;
+      return this;
+    }
+
+    public OutputFileFactory build() {
+      LocationProvider locations = table.locationProvider();
+      FileIO io = table.io();
+      EncryptionManager encryption = table.encryption();
+      return new OutputFileFactory(defaultSpec, format, locations, io, 
encryption, partitionId, taskId, operationId);
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactory.java 
b/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactory.java
new file mode 100644
index 0000000..1021083
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestOutputFileFactory extends TableTestBase {
+
+  @Parameterized.Parameters(name = "formatVersion = {0}")
+  public static Object[] parameters() {
+    return new Object[] { 1, 2 };
+  }
+
+  private static final int PARTITION_ID = 1;
+  private static final int TASK_ID = 100;
+
+  public TestOutputFileFactory(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Test
+  public void testOutputFileFactoryWithCustomFormat() {
+    table.updateProperties()
+        .defaultFormat(FileFormat.ORC)
+        .commit();
+
+    OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 
PARTITION_ID, TASK_ID)
+        .format(FileFormat.AVRO)
+        .build();
+
+    String location = 
fileFactory.newOutputFile().encryptingOutputFile().location();
+    Assert.assertEquals("File format should be correct", FileFormat.AVRO, 
FileFormat.fromFileName(location));
+  }
+
+  @Test
+  public void testOutputFileFactoryWithMultipleSpecs() {
+    OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 
PARTITION_ID, TASK_ID)
+        .operationId("append")
+        .build();
+
+    EncryptedOutputFile unpartitionedFile = 
fileFactory.newOutputFile(PartitionSpec.unpartitioned(), null);
+    String unpartitionedFileLocation = 
unpartitionedFile.encryptingOutputFile().location();
+    
Assert.assertTrue(unpartitionedFileLocation.endsWith("data/00001-100-append-00001.parquet"));
+
+    Record record = 
GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", "aaa"));
+    PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+    partitionKey.partition(record);
+    EncryptedOutputFile partitionedFile = 
fileFactory.newOutputFile(table.spec(), partitionKey);
+    String partitionedFileLocation = 
partitionedFile.encryptingOutputFile().location();
+    
Assert.assertTrue(partitionedFileLocation.endsWith("data_bucket=7/00001-100-append-00002.parquet"));
+  }
+}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java 
b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
index 44ac519..7df402e 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
@@ -92,8 +92,7 @@ public abstract class TestAppenderFactory<T> extends 
TableTestBase {
       this.table = create(SCHEMA, PartitionSpec.unpartitioned());
     }
     this.partition = createPartitionKey();
-    this.fileFactory = new OutputFileFactory(table.spec(), format, 
table.locationProvider(), table.io(),
-        table.encryption(), 1, 1);
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 
1).format(format).build();
 
     table.updateProperties()
         .defaultFormat(format)
diff --git a/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java 
b/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java
index dbab582..89356b2 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java
@@ -75,8 +75,7 @@ public class TestBaseTaskWriter extends TableTestBase {
     this.metadataDir = new File(tableDir, "metadata");
 
     this.table = create(SCHEMA, PartitionSpec.unpartitioned());
-    this.fileFactory = new OutputFileFactory(table.spec(), format, 
table.locationProvider(), table.io(),
-        table.encryption(), 1, 1);
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 
1).format(format).build();
 
     int firstFieldId = table.schema().findField("id").fieldId();
     int secondFieldId = table.schema().findField("data").fieldId();
diff --git 
a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java
 
b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java
index ac2d937..b141e7c 100644
--- 
a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java
+++ 
b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java
@@ -80,8 +80,7 @@ public class TestGenericSortedPosDeleteWriter extends 
TableTestBase {
     this.table = create(SCHEMA, PartitionSpec.unpartitioned());
     this.gRecord = GenericRecord.create(SCHEMA);
 
-    this.fileFactory = new OutputFileFactory(table.spec(), format, 
table.locationProvider(), table.io(),
-        table.encryption(), 1, 1);
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 
1).format(format).build();
 
     table.updateProperties()
         .defaultFormat(format)
diff --git 
a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java 
b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
index 33af5d6..6231a56 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
@@ -88,8 +88,7 @@ public class TestTaskEqualityDeltaWriter extends 
TableTestBase {
     this.metadataDir = new File(tableDir, "metadata");
 
     this.table = create(SCHEMA, PartitionSpec.unpartitioned());
-    this.fileFactory = new OutputFileFactory(table.spec(), format, 
table.locationProvider(), table.io(),
-        table.encryption(), 1, 1);
+    this.fileFactory = OutputFileFactory.builderFor(table, 1, 
1).format(format).build();
 
     this.idFieldId = table.schema().findField("id").fieldId();
     this.dataFieldId = table.schema().findField("data").fieldId();
diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 0f45108..348580d 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -39,9 +39,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericAppenderFactory;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.mapred.Container;
@@ -78,11 +76,13 @@ public class HiveIcebergOutputFormat<T> implements 
OutputFormat<NullWritable, Co
     long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), 
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
             TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
     FileIO io = table.io();
-    LocationProvider location = table.locationProvider();
-    EncryptionManager encryption = table.encryption();
-    OutputFileFactory outputFileFactory =
-        new OutputFileFactory(spec, fileFormat, location, io, encryption, 
taskAttemptID.getTaskID().getId(),
-            taskAttemptID.getId(), 
jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID());
+    int partitionId = taskAttemptID.getTaskID().getId();
+    int taskId = taskAttemptID.getId();
+    String operationId = jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + 
taskAttemptID.getJobID();
+    OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, 
partitionId, taskId)
+        .format(fileFormat)
+        .operationId(operationId)
+        .build();
     String tableName = jc.get(Catalogs.NAME);
 
     return new HiveIcebergRecordWriter(schema, spec, fileFormat,
diff --git 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index af8c383..7b4dce5 100644
--- 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++ 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -41,10 +41,8 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.GenericAppenderFactory;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
@@ -266,18 +264,20 @@ public class TestHiveIcebergOutputCommitter {
 
     Table table = HiveIcebergStorageHandler.table(conf, name);
     FileIO io = table.io();
-    LocationProvider location = table.locationProvider();
-    EncryptionManager encryption = table.encryption();
     Schema schema = HiveIcebergStorageHandler.schema(conf);
     PartitionSpec spec = table.spec();
 
     for (int i = 0; i < taskNum; ++i) {
       List<Record> records = TestHelper.generateRandomRecords(schema, 
RECORD_NUM, i + attemptNum);
       TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), 
JOB_ID.getId(), TaskType.MAP, i, attemptNum);
-      OutputFileFactory outputFileFactory =
-          new OutputFileFactory(spec, FileFormat.PARQUET, location, io, 
encryption, taskId.getTaskID().getId(),
-              attemptNum, QUERY_ID + "-" + JOB_ID);
-      HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, 
spec, FileFormat.PARQUET,
+      int partitionId = taskId.getTaskID().getId();
+      String operationId = QUERY_ID + "-" + JOB_ID;
+      FileFormat fileFormat = FileFormat.PARQUET;
+      OutputFileFactory outputFileFactory = 
OutputFileFactory.builderFor(table, partitionId, attemptNum)
+          .format(fileFormat)
+          .operationId(operationId)
+          .build();
+      HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, 
spec, fileFormat,
           new GenericAppenderFactory(schema), outputFileFactory, io, 
TARGET_FILE_SIZE,
           TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME));
 
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
index 8e97af1..63cc3a4 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
@@ -86,9 +86,13 @@ public class RowDataRewriter implements Serializable {
     RowDataReader dataReader = new RowDataReader(task, table, schema, 
caseSensitive);
 
     StructType structType = SparkSchemaUtil.convert(schema);
-    SparkAppenderFactory appenderFactory =
-        SparkAppenderFactory.builderFor(table, schema, 
structType).spec(spec).build();
-    OutputFileFactory fileFactory = new OutputFileFactory(table, spec, format, 
partitionId, taskId);
+    SparkAppenderFactory appenderFactory = 
SparkAppenderFactory.builderFor(table, schema, structType)
+        .spec(spec)
+        .build();
+    OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 
partitionId, taskId)
+        .defaultSpec(spec)
+        .format(format)
+        .build();
 
     TaskWriter<InternalRow> writer;
     if (spec.isUnpartitioned()) {
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java 
b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
index d03365f..4a7e0c2 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -263,7 +263,7 @@ class Writer implements DataSourceWriter {
     public DataWriter<InternalRow> createDataWriter(int partitionId, long 
taskId, long epochId) {
       Table table = tableBroadcast.value();
 
-      OutputFileFactory fileFactory = new OutputFileFactory(table, format, 
partitionId, taskId);
+      OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 
partitionId, taskId).format(format).build();
       SparkAppenderFactory appenderFactory = 
SparkAppenderFactory.builderFor(table, writeSchema, dsSchema).build();
 
       PartitionSpec spec = table.spec();
diff --git 
a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java 
b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 213fee4..a3da366 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -532,7 +532,7 @@ class SparkWrite {
     public DataWriter<InternalRow> createWriter(int partitionId, long taskId, 
long epochId) {
       Table table = tableBroadcast.value();
 
-      OutputFileFactory fileFactory = new OutputFileFactory(table, format, 
partitionId, taskId);
+      OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, 
partitionId, taskId).format(format).build();
       SparkAppenderFactory appenderFactory = 
SparkAppenderFactory.builderFor(table, writeSchema, dsSchema).build();
 
       PartitionSpec spec = table.spec();

Reply via email to