This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 65319e9 Core: Support multiple specs in OutputFileFactory (#2858)
65319e9 is described below
commit 65319e911235a8cfed6f616955f1f92192d8a52f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Jul 26 11:57:38 2021 -1000
Core: Support multiple specs in OutputFileFactory (#2858)
---
.../org/apache/iceberg/io/OutputFileFactory.java | 72 +++++++++++++++++--
.../apache/iceberg/io/TestOutputFileFactory.java | 81 ++++++++++++++++++++++
.../org/apache/iceberg/io/TestAppenderFactory.java | 3 +-
.../org/apache/iceberg/io/TestBaseTaskWriter.java | 3 +-
.../io/TestGenericSortedPosDeleteWriter.java | 3 +-
.../iceberg/io/TestTaskEqualityDeltaWriter.java | 3 +-
.../iceberg/mr/hive/HiveIcebergOutputFormat.java | 14 ++--
.../mr/hive/TestHiveIcebergOutputCommitter.java | 16 ++---
.../iceberg/spark/source/RowDataRewriter.java | 10 ++-
.../org/apache/iceberg/spark/source/Writer.java | 2 +-
.../apache/iceberg/spark/source/SparkWrite.java | 2 +-
11 files changed, 176 insertions(+), 33 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
index c510890..2607c4c 100644
--- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
+++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.io;
+import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.FileFormat;
@@ -28,11 +29,14 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+
/**
* Factory responsible for generating unique but recognizable data file names.
*/
public class OutputFileFactory {
- private final PartitionSpec spec;
+ private final PartitionSpec defaultSpec;
private final FileFormat format;
private final LocationProvider locations;
private final FileIO io;
@@ -45,12 +49,13 @@ public class OutputFileFactory {
private final String operationId;
private final AtomicInteger fileCount = new AtomicInteger(0);
- // TODO: expose a builder like OutputFileFactory.forTable()
+ @Deprecated
public OutputFileFactory(Table table, FileFormat format, int partitionId,
long taskId) {
this(table.spec(), format, table.locationProvider(), table.io(),
table.encryption(),
partitionId, taskId, UUID.randomUUID().toString());
}
+ @Deprecated
public OutputFileFactory(Table table, PartitionSpec spec, FileFormat format,
int partitionId, long taskId) {
this(spec, format, table.locationProvider(), table.io(),
table.encryption(),
partitionId, taskId, UUID.randomUUID().toString());
@@ -65,7 +70,9 @@ public class OutputFileFactory {
* @param encryptionManager Encryption manager used for encrypting the files
* @param partitionId First part of the file name
* @param taskId Second part of the file name
+ * @deprecated since 0.12.0, will be removed in 0.13.0; use {@link
#builderFor(Table, int, long)} instead.
*/
+ @Deprecated
public OutputFileFactory(PartitionSpec spec, FileFormat format,
LocationProvider locations, FileIO io,
EncryptionManager encryptionManager, int
partitionId, long taskId) {
this(spec, format, locations, io, encryptionManager, partitionId, taskId,
UUID.randomUUID().toString());
@@ -82,10 +89,12 @@ public class OutputFileFactory {
* @param partitionId First part of the file name
* @param taskId Second part of the file name
* @param operationId Third part of the file name
+ * @deprecated since 0.12.0, will be removed in 0.13.0; use {@link
#builderFor(Table, int, long)} instead.
*/
+ @Deprecated
public OutputFileFactory(PartitionSpec spec, FileFormat format,
LocationProvider locations, FileIO io,
EncryptionManager encryptionManager, int
partitionId, long taskId, String operationId) {
- this.spec = spec;
+ this.defaultSpec = spec;
this.format = format;
this.locations = locations;
this.io = io;
@@ -95,13 +104,17 @@ public class OutputFileFactory {
this.operationId = operationId;
}
+ public static Builder builderFor(Table table, int partitionId, long taskId) {
+ return new Builder(table, partitionId, taskId);
+ }
+
private String generateFilename() {
return format.addExtension(
String.format("%05d-%d-%s-%05d", partitionId, taskId, operationId,
fileCount.incrementAndGet()));
}
/**
- * Generates EncryptedOutputFile for UnpartitionedWriter.
+ * Generates an {@link EncryptedOutputFile} for unpartitioned writes.
*/
public EncryptedOutputFile newOutputFile() {
OutputFile file =
io.newOutputFile(locations.newDataLocation(generateFilename()));
@@ -109,11 +122,60 @@ public class OutputFileFactory {
}
/**
- * Generates EncryptedOutputFile for PartitionedWriter.
+ * Generates an {@link EncryptedOutputFile} for partitioned writes in the
default spec.
*/
public EncryptedOutputFile newOutputFile(StructLike partition) {
+ return newOutputFile(defaultSpec, partition);
+ }
+
+ /**
+ * Generates an {@link EncryptedOutputFile} for partitioned writes in a
given spec.
+ */
+ public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike
partition) {
String newDataLocation = locations.newDataLocation(spec, partition,
generateFilename());
OutputFile rawOutputFile = io.newOutputFile(newDataLocation);
return encryptionManager.encrypt(rawOutputFile);
}
+
+ public static class Builder {
+ private final Table table;
+ private final int partitionId;
+ private final long taskId;
+ private PartitionSpec defaultSpec;
+ private String operationId;
+ private FileFormat format;
+
+ private Builder(Table table, int partitionId, long taskId) {
+ this.table = table;
+ this.partitionId = partitionId;
+ this.taskId = taskId;
+ this.defaultSpec = table.spec();
+ this.operationId = UUID.randomUUID().toString();
+
+ String formatAsString =
table.properties().getOrDefault(DEFAULT_FILE_FORMAT,
DEFAULT_FILE_FORMAT_DEFAULT);
+ this.format =
FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT));
+ }
+
+ public Builder defaultSpec(PartitionSpec newDefaultSpec) {
+ this.defaultSpec = newDefaultSpec;
+ return this;
+ }
+
+ public Builder operationId(String newOperationId) {
+ this.operationId = newOperationId;
+ return this;
+ }
+
+ public Builder format(FileFormat newFormat) {
+ this.format = newFormat;
+ return this;
+ }
+
+ public OutputFileFactory build() {
+ LocationProvider locations = table.locationProvider();
+ FileIO io = table.io();
+ EncryptionManager encryption = table.encryption();
+ return new OutputFileFactory(defaultSpec, format, locations, io,
encryption, partitionId, taskId, operationId);
+ }
+ }
}
diff --git
a/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactory.java
b/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactory.java
new file mode 100644
index 0000000..1021083
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestOutputFileFactory extends TableTestBase {
+
+ @Parameterized.Parameters(name = "formatVersion = {0}")
+ public static Object[] parameters() {
+ return new Object[] { 1, 2 };
+ }
+
+ private static final int PARTITION_ID = 1;
+ private static final int TASK_ID = 100;
+
+ public TestOutputFileFactory(int formatVersion) {
+ super(formatVersion);
+ }
+
+ @Test
+ public void testOutputFileFactoryWithCustomFormat() {
+ table.updateProperties()
+ .defaultFormat(FileFormat.ORC)
+ .commit();
+
+ OutputFileFactory fileFactory = OutputFileFactory.builderFor(table,
PARTITION_ID, TASK_ID)
+ .format(FileFormat.AVRO)
+ .build();
+
+ String location =
fileFactory.newOutputFile().encryptingOutputFile().location();
+ Assert.assertEquals("File format should be correct", FileFormat.AVRO,
FileFormat.fromFileName(location));
+ }
+
+ @Test
+ public void testOutputFileFactoryWithMultipleSpecs() {
+ OutputFileFactory fileFactory = OutputFileFactory.builderFor(table,
PARTITION_ID, TASK_ID)
+ .operationId("append")
+ .build();
+
+ EncryptedOutputFile unpartitionedFile =
fileFactory.newOutputFile(PartitionSpec.unpartitioned(), null);
+ String unpartitionedFileLocation =
unpartitionedFile.encryptingOutputFile().location();
+
Assert.assertTrue(unpartitionedFileLocation.endsWith("data/00001-100-append-00001.parquet"));
+
+ Record record =
GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", "aaa"));
+ PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+ partitionKey.partition(record);
+ EncryptedOutputFile partitionedFile =
fileFactory.newOutputFile(table.spec(), partitionKey);
+ String partitionedFileLocation =
partitionedFile.encryptingOutputFile().location();
+
Assert.assertTrue(partitionedFileLocation.endsWith("data_bucket=7/00001-100-append-00002.parquet"));
+ }
+}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
index 44ac519..7df402e 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
@@ -92,8 +92,7 @@ public abstract class TestAppenderFactory<T> extends
TableTestBase {
this.table = create(SCHEMA, PartitionSpec.unpartitioned());
}
this.partition = createPartitionKey();
- this.fileFactory = new OutputFileFactory(table.spec(), format,
table.locationProvider(), table.io(),
- table.encryption(), 1, 1);
+ this.fileFactory = OutputFileFactory.builderFor(table, 1,
1).format(format).build();
table.updateProperties()
.defaultFormat(format)
diff --git a/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java
b/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java
index dbab582..89356b2 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java
@@ -75,8 +75,7 @@ public class TestBaseTaskWriter extends TableTestBase {
this.metadataDir = new File(tableDir, "metadata");
this.table = create(SCHEMA, PartitionSpec.unpartitioned());
- this.fileFactory = new OutputFileFactory(table.spec(), format,
table.locationProvider(), table.io(),
- table.encryption(), 1, 1);
+ this.fileFactory = OutputFileFactory.builderFor(table, 1,
1).format(format).build();
int firstFieldId = table.schema().findField("id").fieldId();
int secondFieldId = table.schema().findField("data").fieldId();
diff --git
a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java
b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java
index ac2d937..b141e7c 100644
---
a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java
+++
b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java
@@ -80,8 +80,7 @@ public class TestGenericSortedPosDeleteWriter extends
TableTestBase {
this.table = create(SCHEMA, PartitionSpec.unpartitioned());
this.gRecord = GenericRecord.create(SCHEMA);
- this.fileFactory = new OutputFileFactory(table.spec(), format,
table.locationProvider(), table.io(),
- table.encryption(), 1, 1);
+ this.fileFactory = OutputFileFactory.builderFor(table, 1,
1).format(format).build();
table.updateProperties()
.defaultFormat(format)
diff --git
a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
index 33af5d6..6231a56 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java
@@ -88,8 +88,7 @@ public class TestTaskEqualityDeltaWriter extends
TableTestBase {
this.metadataDir = new File(tableDir, "metadata");
this.table = create(SCHEMA, PartitionSpec.unpartitioned());
- this.fileFactory = new OutputFileFactory(table.spec(), format,
table.locationProvider(), table.io(),
- table.encryption(), 1, 1);
+ this.fileFactory = OutputFileFactory.builderFor(table, 1,
1).format(format).build();
this.idFieldId = table.schema().findField("id").fieldId();
this.dataFieldId = table.schema().findField("data").fieldId();
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 0f45108..348580d 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -39,9 +39,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.mapred.Container;
@@ -78,11 +76,13 @@ public class HiveIcebergOutputFormat<T> implements
OutputFormat<NullWritable, Co
long targetFileSize = PropertyUtil.propertyAsLong(table.properties(),
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
FileIO io = table.io();
- LocationProvider location = table.locationProvider();
- EncryptionManager encryption = table.encryption();
- OutputFileFactory outputFileFactory =
- new OutputFileFactory(spec, fileFormat, location, io, encryption,
taskAttemptID.getTaskID().getId(),
- taskAttemptID.getId(),
jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID());
+ int partitionId = taskAttemptID.getTaskID().getId();
+ int taskId = taskAttemptID.getId();
+ String operationId = jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" +
taskAttemptID.getJobID();
+ OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table,
partitionId, taskId)
+ .format(fileFormat)
+ .operationId(operationId)
+ .build();
String tableName = jc.get(Catalogs.NAME);
return new HiveIcebergRecordWriter(schema, spec, fileFormat,
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index af8c383..7b4dce5 100644
---
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -41,10 +41,8 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
@@ -266,18 +264,20 @@ public class TestHiveIcebergOutputCommitter {
Table table = HiveIcebergStorageHandler.table(conf, name);
FileIO io = table.io();
- LocationProvider location = table.locationProvider();
- EncryptionManager encryption = table.encryption();
Schema schema = HiveIcebergStorageHandler.schema(conf);
PartitionSpec spec = table.spec();
for (int i = 0; i < taskNum; ++i) {
List<Record> records = TestHelper.generateRandomRecords(schema,
RECORD_NUM, i + attemptNum);
TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(),
JOB_ID.getId(), TaskType.MAP, i, attemptNum);
- OutputFileFactory outputFileFactory =
- new OutputFileFactory(spec, FileFormat.PARQUET, location, io,
encryption, taskId.getTaskID().getId(),
- attemptNum, QUERY_ID + "-" + JOB_ID);
- HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema,
spec, FileFormat.PARQUET,
+ int partitionId = taskId.getTaskID().getId();
+ String operationId = QUERY_ID + "-" + JOB_ID;
+ FileFormat fileFormat = FileFormat.PARQUET;
+ OutputFileFactory outputFileFactory =
OutputFileFactory.builderFor(table, partitionId, attemptNum)
+ .format(fileFormat)
+ .operationId(operationId)
+ .build();
+ HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema,
spec, fileFormat,
new GenericAppenderFactory(schema), outputFileFactory, io,
TARGET_FILE_SIZE,
TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME));
diff --git
a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
index 8e97af1..63cc3a4 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
@@ -86,9 +86,13 @@ public class RowDataRewriter implements Serializable {
RowDataReader dataReader = new RowDataReader(task, table, schema,
caseSensitive);
StructType structType = SparkSchemaUtil.convert(schema);
- SparkAppenderFactory appenderFactory =
- SparkAppenderFactory.builderFor(table, schema,
structType).spec(spec).build();
- OutputFileFactory fileFactory = new OutputFileFactory(table, spec, format,
partitionId, taskId);
+ SparkAppenderFactory appenderFactory =
SparkAppenderFactory.builderFor(table, schema, structType)
+ .spec(spec)
+ .build();
+ OutputFileFactory fileFactory = OutputFileFactory.builderFor(table,
partitionId, taskId)
+ .defaultSpec(spec)
+ .format(format)
+ .build();
TaskWriter<InternalRow> writer;
if (spec.isUnpartitioned()) {
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
index d03365f..4a7e0c2 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -263,7 +263,7 @@ class Writer implements DataSourceWriter {
public DataWriter<InternalRow> createDataWriter(int partitionId, long
taskId, long epochId) {
Table table = tableBroadcast.value();
- OutputFileFactory fileFactory = new OutputFileFactory(table, format,
partitionId, taskId);
+ OutputFileFactory fileFactory = OutputFileFactory.builderFor(table,
partitionId, taskId).format(format).build();
SparkAppenderFactory appenderFactory =
SparkAppenderFactory.builderFor(table, writeSchema, dsSchema).build();
PartitionSpec spec = table.spec();
diff --git
a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 213fee4..a3da366 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -532,7 +532,7 @@ class SparkWrite {
public DataWriter<InternalRow> createWriter(int partitionId, long taskId,
long epochId) {
Table table = tableBroadcast.value();
- OutputFileFactory fileFactory = new OutputFileFactory(table, format,
partitionId, taskId);
+ OutputFileFactory fileFactory = OutputFileFactory.builderFor(table,
partitionId, taskId).format(format).build();
SparkAppenderFactory appenderFactory =
SparkAppenderFactory.builderFor(table, writeSchema, dsSchema).build();
PartitionSpec spec = table.spec();