This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 129c969 Hive: Add mapred InputFormat (#1192)
129c969 is described below
commit 129c9693683fe0f3937f467d1fd817cb602f8224
Author: Adrien Guillo <[email protected]>
AuthorDate: Wed Jul 22 10:06:21 2020 -0700
Hive: Add mapred InputFormat (#1192)
---
build.gradle | 24 +
.../org/apache/iceberg/mr/InputFormatConfig.java | 14 +
.../iceberg/mr/hive/HiveIcebergInputFormat.java | 85 ++++
.../apache/iceberg/mr/hive/HiveIcebergSplit.java | 93 ++++
.../{IcebergWritable.java => Container.java} | 34 +-
.../org/apache/iceberg/mr/mapred/IcebergSerDe.java | 4 +-
.../mr/mapred/MapredIcebergInputFormat.java | 191 +++++++
.../apache/iceberg/mr/mapred/TableResolver.java | 4 +-
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 58 +--
.../apache/iceberg/mr/mapreduce/IcebergSplit.java | 89 ++++
.../mr/mapreduce/IcebergSplitContainer.java | 26 +
.../java/org/apache/iceberg/mr/TestHelper.java | 226 +++++++++
.../apache/iceberg/mr/TestIcebergInputFormats.java | 485 ++++++++++++++++++
.../mr/hive/TestHiveIcebergInputFormat.java | 169 +++++++
.../org/apache/iceberg/mr/mapred/TestHelpers.java | 119 -----
.../apache/iceberg/mr/mapred/TestIcebergSerDe.java | 5 +-
.../mr/mapreduce/TestIcebergInputFormat.java | 557 ---------------------
17 files changed, 1420 insertions(+), 763 deletions(-)
diff --git a/build.gradle b/build.gradle
index babd164..6510cc5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -79,6 +79,7 @@ subprojects {
all {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'org.mortbay.jetty'
+ exclude group: 'org.pentaho', module: 'pentaho-aggdesigner-algorithm'
resolutionStrategy {
force 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.2'
@@ -371,11 +372,34 @@ project(':iceberg-mr') {
compileOnly("org.apache.hadoop:hadoop-client") {
exclude group: 'org.apache.avro', module: 'avro'
}
+
+ compileOnly("org.apache.hive:hive-exec::core") {
+ exclude group: 'com.google.code.findbugs', module: 'jsr305'
+ exclude group: 'com.google.guava'
+ exclude group: 'com.google.protobuf', module: 'protobuf-java'
+ exclude group: 'org.apache.avro', module: 'avro'
+ exclude group: 'org.apache.calcite.avatica'
+ exclude group: 'org.apache.hive', module: 'hive-llap-tez'
+ exclude group: 'org.apache.logging.log4j'
+ exclude group: 'org.pentaho' // missing dependency
+ exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+ }
compileOnly("org.apache.hive:hive-serde")
testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
+
+ testCompile("org.apache.avro:avro:1.9.2")
+ testCompile("org.apache.calcite:calcite-core")
+ testCompile("com.esotericsoftware:kryo-shaded:4.0.2")
+ testCompile("com.fasterxml.jackson.core:jackson-annotations:2.6.5")
+ testCompile("com.klarna:hiverunner:5.2.1") {
+ exclude group: 'javax.jms', module: 'jms'
+ exclude group: 'org.apache.hive', module: 'hive-exec'
+ exclude group: 'org.codehaus.jettison', module: 'jettison'
+ exclude group: 'org.apache.calcite.avatica'
+ }
}
}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index f4b9aae..e266155 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -19,11 +19,13 @@
package org.apache.iceberg.mr;
+import java.io.File;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
public class InputFormatConfig {
@@ -73,6 +75,10 @@ public class InputFormatConfig {
conf.setBoolean(LOCALITY, false);
}
+ public Configuration conf() {
+ return conf;
+ }
+
public ConfigBuilder filter(Expression expression) {
conf.set(FILTER_EXPRESSION,
SerializationUtil.serializeToBase64(expression));
return this;
@@ -88,6 +94,14 @@ public class InputFormatConfig {
return this;
}
+ public ConfigBuilder readFrom(TableIdentifier identifier) {
+ return readFrom(identifier.toString());
+ }
+
+ public ConfigBuilder readFrom(File path) {
+ return readFrom(path.toString());
+ }
+
public ConfigBuilder readFrom(String path) {
conf.set(TABLE_PATH, path);
return this;
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
new file mode 100644
index 0000000..0b9fd70
--- /dev/null
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapred.TableResolver;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
+ implements
CombineHiveInputFormat.AvoidSplitCombination {
+
+ private transient Table table;
+ private transient Schema schema;
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
{
+ table = TableResolver.resolveTableFromConfiguration(job);
+ schema = table.schema();
+
+ forwardConfigSettings(job);
+
+ return Arrays.stream(super.getSplits(job, numSplits))
+ .map(split -> new HiveIcebergSplit((IcebergSplit) split,
table.location()))
+ .toArray(InputSplit[]::new);
+ }
+
+ @Override
+ public RecordReader<Void, Container<Record>> getRecordReader(InputSplit
split, JobConf job,
+ Reporter
reporter) throws IOException {
+ // Since Hive passes a copy of `job` in `getSplits`, we need to forward
the conf settings again.
+ forwardConfigSettings(job);
+ return super.getRecordReader(split, job, reporter);
+ }
+
+ @Override
+ public boolean shouldSkipCombine(Path path, Configuration conf) {
+ return true;
+ }
+
+ /**
+ * Forward configuration settings to the underlying MR input format.
+ */
+ private void forwardConfigSettings(JobConf job) {
+ Preconditions.checkNotNull(table, "Table cannot be null");
+ Preconditions.checkNotNull(schema, "Schema cannot be null");
+
+ // Once mapred.TableResolver and mapreduce.TableResolver use the same
property for the location of the table
+ // (TABLE_LOCATION vs. TABLE_PATH), this line can be removed: see
https://github.com/apache/iceberg/issues/1155.
+ job.set(InputFormatConfig.TABLE_PATH, table.location());
+ job.set(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema));
+ }
+}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
new file mode 100644
index 0000000..e94ff20
--- /dev/null
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.iceberg.mr.SerializationUtil;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+// Hive requires file formats to return splits that are instances of
`FileSplit`.
+public class HiveIcebergSplit extends FileSplit implements
IcebergSplitContainer {
+
+ private IcebergSplit innerSplit;
+
+ // Hive uses the path name of a split to map it back to a partition
(`PartitionDesc`) or table description object
+ // (`TableDesc`) which specifies the relevant input format for reading the
files belonging to that partition or table.
+ // That way, `HiveInputFormat` and `CombineHiveInputFormat` can read files
with different file formats in the same
+ // MapReduce job and merge compatible splits together.
+ private String tableLocation;
+
+ // public no-argument constructor for deserialization
+ public HiveIcebergSplit() {}
+
+ HiveIcebergSplit(IcebergSplit split, String tableLocation) {
+ this.innerSplit = split;
+ this.tableLocation = tableLocation;
+ }
+
+ @Override
+ public IcebergSplit icebergSplit() {
+ return innerSplit;
+ }
+
+ @Override
+ public long getLength() {
+ return innerSplit.getLength();
+ }
+
+ @Override
+ public String[] getLocations() {
+ return innerSplit.getLocations();
+ }
+
+ @Override
+ public Path getPath() {
+ return new Path(tableLocation);
+ }
+
+ @Override
+ public long getStart() {
+ return 0;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ byte[] bytes = SerializationUtil.serializeToBytes(tableLocation);
+ out.writeInt(bytes.length);
+ out.write(bytes);
+
+ innerSplit.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ byte[] bytes = new byte[in.readInt()];
+ in.readFully(bytes);
+ tableLocation = SerializationUtil.deserializeFromBytes(bytes);
+
+ innerSplit = new IcebergSplit();
+ innerSplit.readFields(in);
+ }
+}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergWritable.java
b/mr/src/main/java/org/apache/iceberg/mr/mapred/Container.java
similarity index 64%
rename from mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergWritable.java
rename to mr/src/main/java/org/apache/iceberg/mr/mapred/Container.java
index 1eb67f9..fda174e 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergWritable.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/Container.java
@@ -22,38 +22,22 @@ package org.apache.iceberg.mr.mapred;
import java.io.DataInput;
import java.io.DataOutput;
import org.apache.hadoop.io.Writable;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.Record;
/**
- * Wraps an Iceberg Record in a Writable which Hive can use in the SerDe.
+ * A simple container of objects that you can get and set.
+ *
+ * @param <T> the Java type of the object held by this container
*/
-public class IcebergWritable implements Writable {
-
- private Record record;
- private Schema schema;
-
- public IcebergWritable(Record record, Schema schema) {
- this.record = record;
- this.schema = schema;
- }
+public class Container<T> implements Writable {
- @SuppressWarnings("checkstyle:HiddenField")
- public void wrapRecord(Record record) {
- this.record = record;
- }
-
- public Record record() {
- return record;
- }
+ private T value;
- public Schema schema() {
- return schema;
+ public T get() {
+ return value;
}
- @SuppressWarnings("checkstyle:HiddenField")
- public void wrapSchema(Schema schema) {
- this.schema = schema;
+ public void set(T newValue) {
+ this.value = newValue;
}
@Override
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java
b/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java
index 871e0e6..f0ee139 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java
@@ -55,7 +55,7 @@ public class IcebergSerDe extends AbstractSerDe {
@Override
public Class<? extends Writable> getSerializedClass() {
- return IcebergWritable.class;
+ return Container.class;
}
@Override
@@ -70,7 +70,7 @@ public class IcebergSerDe extends AbstractSerDe {
@Override
public Object deserialize(Writable writable) {
- return ((IcebergWritable) writable).record();
+ return ((Container<?>) writable).get();
}
@Override
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
b/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
new file mode 100644
index 0000000..5ecba15
--- /dev/null
+++
b/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic MR v1 InputFormat API for Iceberg.
+ *
+ * @param <T> Java class of records constructed by Iceberg; default is {@link
Record}
+ */
+public class MapredIcebergInputFormat<T> implements InputFormat<Void,
Container<T>> {
+
+ private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T>
innerInputFormat;
+
+ public MapredIcebergInputFormat() {
+ this.innerInputFormat = new
org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>();
+ }
+
+ /**
+ * Configures the {@code JobConf} to use the {@code
MapredIcebergInputFormat} and
+ * returns a helper to add further configuration.
+ *
+ * @param job the {@code JobConf} to configure
+ */
+ public static InputFormatConfig.ConfigBuilder configure(JobConf job) {
+ job.setInputFormat(MapredIcebergInputFormat.class);
+ return new InputFormatConfig.ConfigBuilder(job);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
{
+ return innerInputFormat.getSplits(newJobContext(job))
+ .stream()
+ .map(InputSplit.class::cast)
+ .toArray(InputSplit[]::new);
+ }
+
+ @Override
+ public RecordReader<Void, Container<T>> getRecordReader(InputSplit split,
JobConf job,
+ Reporter reporter)
throws IOException {
+ IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
+ return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit,
job, reporter);
+ }
+
+ private static final class MapredIcebergRecordReader<T> implements
RecordReader<Void, Container<T>> {
+
+ private final org.apache.hadoop.mapreduce.RecordReader<Void, T>
innerReader;
+ private final long splitLength; // for getPos()
+
+
MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T>
mapreduceInputFormat,
+ IcebergSplit split, JobConf job, Reporter
reporter) throws IOException {
+ TaskAttemptContext context = newTaskAttemptContext(job, reporter);
+
+ try {
+ innerReader = mapreduceInputFormat.createRecordReader(split, context);
+ innerReader.initialize(split, context);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+
+ splitLength = split.getLength();
+ }
+
+ @Override
+ public boolean next(Void key, Container<T> value) throws IOException {
+ try {
+ if (innerReader.nextKeyValue()) {
+ value.set(innerReader.getCurrentValue());
+ return true;
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(ie);
+ }
+
+ return false;
+ }
+
+ @Override
+ public Void createKey() {
+ return null;
+ }
+
+ @Override
+ public Container<T> createValue() {
+ return new Container<>();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return (long) (splitLength * getProgress());
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ try {
+ return innerReader.getProgress();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (innerReader != null) {
+ innerReader.close();
+ }
+ }
+ }
+
+ private static JobContext newJobContext(JobConf job) {
+ JobID jobID = Optional.ofNullable(JobID.forName(job.get(JobContext.ID)))
+ .orElse(new JobID());
+
+ return new JobContextImpl(job, jobID);
+ }
+
+ private static TaskAttemptContext newTaskAttemptContext(JobConf job,
Reporter reporter) {
+ TaskAttemptID taskAttemptID =
Optional.ofNullable(TaskAttemptID.forName(job.get(JobContext.TASK_ATTEMPT_ID)))
+ .orElse(new TaskAttemptID());
+
+ return new TaskAttemptContextImpl(job, taskAttemptID,
toStatusReporter(reporter));
+ }
+
+ private static StatusReporter toStatusReporter(Reporter reporter) {
+ return new StatusReporter() {
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ return reporter.getCounter(name);
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ return reporter.getCounter(group, name);
+ }
+
+ @Override
+ public void progress() {
+ reporter.progress();
+ }
+
+ @Override
+ public float getProgress() {
+ return reporter.getProgress();
+ }
+
+ @Override
+ public void setStatus(String status) {
+ reporter.setStatus(status);
+ }
+ };
+ }
+}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java
b/mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java
index 7ee670d..c8c7414 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java
@@ -29,7 +29,7 @@ import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-final class TableResolver {
+public final class TableResolver {
private TableResolver() {
}
@@ -42,7 +42,7 @@ final class TableResolver {
return resolveTableFromConfiguration(configuration);
}
- static Table resolveTableFromConfiguration(Configuration conf) throws
IOException {
+ public static Table resolveTableFromConfiguration(Configuration conf) throws
IOException {
//Default to HadoopTables
String catalogName = conf.get(InputFormatConfig.CATALOG_NAME,
InputFormatConfig.HADOOP_TABLES);
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 040194c..7c0a737 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -19,8 +19,6 @@
package org.apache.iceberg.mr.mapreduce;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
@@ -31,7 +29,6 @@ import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -62,7 +59,6 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
@@ -87,8 +83,6 @@ import org.slf4j.LoggerFactory;
public class IcebergInputFormat<T> extends InputFormat<Void, T> {
private static final Logger LOG =
LoggerFactory.getLogger(IcebergInputFormat.class);
- private transient List<InputSplit> splits;
-
/**
* Configures the {@code Job} to use the {@code IcebergInputFormat} and
* returns a helper to add further configuration.
@@ -102,11 +96,6 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
@Override
public List<InputSplit> getSplits(JobContext context) {
- if (splits != null) {
- LOG.info("Returning cached splits: {}", splits.size());
- return splits;
- }
-
Configuration conf = context.getConfiguration();
Table table = findTable(conf);
TableScan scan = table.newScan()
@@ -134,7 +123,7 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
scan = scan.filter(filter);
}
- splits = Lists.newArrayList();
+ List<InputSplit> splits = Lists.newArrayList();
boolean applyResidual =
!conf.getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
InputFormatConfig.InMemoryDataModel model =
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);
@@ -186,7 +175,7 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
public void initialize(InputSplit split, TaskAttemptContext newContext) {
Configuration conf = newContext.getConfiguration();
// For now IcebergInputFormat does its own split planning and does not
accept FileSplit instances
- CombinedScanTask task = ((IcebergSplit) split).task;
+ CombinedScanTask task = ((IcebergSplit) split).task();
this.context = newContext;
this.tasks = task.files().iterator();
this.tableSchema =
SchemaParser.fromJson(conf.get(InputFormatConfig.TABLE_SCHEMA));
@@ -382,47 +371,4 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
}
}
- static class IcebergSplit extends InputSplit implements Writable {
- static final String[] ANYWHERE = new String[]{"*"};
- private CombinedScanTask task;
- private transient String[] locations;
- private transient Configuration conf;
-
- IcebergSplit(Configuration conf, CombinedScanTask task) {
- this.task = task;
- this.conf = conf;
- }
-
- @Override
- public long getLength() {
- return task.files().stream().mapToLong(FileScanTask::length).sum();
- }
-
- @Override
- public String[] getLocations() {
- boolean localityPreferred = conf.getBoolean(InputFormatConfig.LOCALITY,
false);
- if (!localityPreferred) {
- return ANYWHERE;
- }
- if (locations != null) {
- return locations;
- }
- locations = Util.blockLocations(task, conf);
- return locations;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- byte[] data = SerializationUtil.serializeToBytes(this.task);
- out.writeInt(data.length);
- out.write(data);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- byte[] data = new byte[in.readInt()];
- in.readFully(data);
- this.task = SerializationUtil.deserializeFromBytes(data);
- }
- }
}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
new file mode 100644
index 0000000..1e9a6b5
--- /dev/null
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.SerializationUtil;
+
+// Since this class extends `mapreduce.InputSplit and implements
`mapred.InputSplit`, it can be returned by both MR v1
+// and v2 file formats.
+public class IcebergSplit extends InputSplit implements
org.apache.hadoop.mapred.InputSplit, IcebergSplitContainer {
+
+ public static final String[] ANYWHERE = new String[]{"*"};
+
+ private CombinedScanTask task;
+
+ private transient String[] locations;
+ private transient Configuration conf;
+
+ // public no-argument constructor for deserialization
+ public IcebergSplit() {}
+
+ IcebergSplit(Configuration conf, CombinedScanTask task) {
+ this.task = task;
+ this.conf = conf;
+ }
+
+ public CombinedScanTask task() {
+ return task;
+ }
+
+ @Override
+ public IcebergSplit icebergSplit() {
+ return this;
+ }
+
+ @Override
+ public long getLength() {
+ return task.files().stream().mapToLong(FileScanTask::length).sum();
+ }
+
+ @Override
+ public String[] getLocations() {
+ if (locations == null) {
+ boolean localityPreferred = conf.getBoolean(InputFormatConfig.LOCALITY,
false);
+ locations = localityPreferred ? Util.blockLocations(task, conf) :
ANYWHERE;
+ }
+
+ return locations;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ byte[] data = SerializationUtil.serializeToBytes(this.task);
+ out.writeInt(data.length);
+ out.write(data);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ byte[] data = new byte[in.readInt()];
+ in.readFully(data);
+ this.task = SerializationUtil.deserializeFromBytes(data);
+ }
+}
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java
new file mode 100644
index 0000000..c775437
--- /dev/null
+++
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapreduce;
+
+public interface IcebergSplitContainer {
+
+ IcebergSplit icebergSplit();
+
+}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
new file mode 100644
index 0000000..0c96891
--- /dev/null
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
+
+public class TestHelper {
+
+ private final Configuration conf;
+ private final HadoopTables tables;
+ private final Schema schema;
+ private final PartitionSpec spec;
+ private final FileFormat fileFormat;
+ private final TemporaryFolder tmp;
+ private final File location;
+
+ private Table table;
+
+ public TestHelper(Configuration conf, HadoopTables tables, Schema schema,
PartitionSpec spec, FileFormat fileFormat,
+ TemporaryFolder tmp, File location) {
+ this.conf = conf;
+ this.tables = tables;
+ this.schema = schema;
+ this.spec = spec;
+ this.fileFormat = fileFormat;
+ this.tmp = tmp;
+ this.location = location;
+ }
+
+ private void setTable(Table table) {
+ this.table = table;
+ conf.set(InputFormatConfig.TABLE_SCHEMA,
SchemaParser.toJson(table.schema()));
+ }
+
+ public Table getTable() {
+ return table;
+ }
+
+ public Table createTable(Schema theSchema, PartitionSpec theSpec) {
+ Map<String, String> properties =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name());
+ Table tbl = tables.create(theSchema, theSpec, properties,
location.toString());
+ setTable(tbl);
+ return tbl;
+ }
+
+ public Table createTable(Catalog catalog, TableIdentifier identifier) {
+ Map<String, String> properties =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name());
+ Table tbl = catalog.createTable(identifier, schema, spec, properties);
+ setTable(tbl);
+ return tbl;
+ }
+
+ public Table createPartitionedTable() {
+ return createTable(schema, spec);
+ }
+
+ public Table createUnpartitionedTable() {
+ return createTable(schema, PartitionSpec.unpartitioned());
+ }
+
+
+ public List<Record> generateRandomRecords(int num, long seed) {
+ Preconditions.checkNotNull(table, "table not set");
+ return generateRandomRecords(table.schema(), num, seed);
+ }
+
+ public static List<Record> generateRandomRecords(Schema schema, int num,
long seed) {
+ return RandomGenericData.generate(schema, num, seed);
+ }
+
+ public void appendToTable(DataFile... dataFiles) {
+ Preconditions.checkNotNull(table, "table not set");
+
+ AppendFiles append = table.newAppend();
+
+ for (DataFile dataFile : dataFiles) {
+ append = append.appendFile(dataFile);
+ }
+
+ append.commit();
+ }
+
+ public void appendToTable(StructLike partition, List<Record> records) throws
IOException {
+ appendToTable(writeFile(partition, records));
+ }
+
+ public DataFile writeFile(StructLike partition, List<Record> records) throws
IOException {
+ Preconditions.checkNotNull(table, "table not set");
+ return writeFile(table, partition, records, fileFormat, tmp.newFile());
+ }
+
+ public static DataFile writeFile(Table table, StructLike partition,
List<Record> records, FileFormat fileFormat,
+ File file) throws IOException {
+ Assert.assertTrue(file.delete());
+
+ FileAppender<Record> appender;
+
+ switch (fileFormat) {
+ case AVRO:
+ appender = Avro.write(Files.localOutput(file))
+ .schema(table.schema())
+ .createWriterFunc(DataWriter::create)
+ .named(fileFormat.name())
+ .build();
+ break;
+
+ case PARQUET:
+ appender = Parquet.write(Files.localOutput(file))
+ .schema(table.schema())
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .named(fileFormat.name())
+ .build();
+ break;
+
+ case ORC:
+ appender = ORC.write(Files.localOutput(file))
+ .schema(table.schema())
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .build();
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Cannot write format: " +
fileFormat);
+ }
+
+ try {
+ appender.addAll(records);
+ } finally {
+ appender.close();
+ }
+
+ DataFiles.Builder builder = DataFiles.builder(table.spec())
+ .withPath(file.toString())
+ .withFormat(fileFormat)
+ .withFileSizeInBytes(file.length())
+ .withMetrics(appender.metrics());
+
+ if (partition != null) {
+ builder.withPartition(partition);
+ }
+
+ return builder.build();
+ }
+
+ public static class RecordsBuilder {
+
+ private final List<Record> records = new ArrayList<Record>();
+ private final Schema schema;
+
+ private RecordsBuilder(Schema schema) {
+ this.schema = schema;
+ }
+
+ public RecordsBuilder add(Object... values) {
+ Preconditions.checkArgument(schema.columns().size() == values.length);
+
+ GenericRecord record = GenericRecord.create(schema);
+
+ for (int i = 0; i < values.length; i++) {
+ record.set(i, values[i]);
+ }
+
+ records.add(record);
+ return this;
+ }
+
+ public List<Record> build() {
+ return Collections.unmodifiableList(records);
+ }
+
+ public static RecordsBuilder newInstance(Schema schema) {
+ return new RecordsBuilder(schema);
+ }
+ }
+}
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
b/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
new file mode 100644
index 0000000..4470aec
--- /dev/null
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TestHelpers.Row;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapreduce.IcebergInputFormat;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+@RunWith(Parameterized.class)
+public class TestIcebergInputFormats {
+
+ private static final List<TestInputFormat.Factory<Record>>
TESTED_INPUT_FORMATS = ImmutableList.of(
+ TestInputFormat.newFactory("IcebergInputFormat",
TestIcebergInputFormat::create),
+ TestInputFormat.newFactory("MapredIcebergInputFormat",
TestMapredIcebergInputFormat::create));
+
+ private static final List<String> TESTED_FILE_FORMATS =
ImmutableList.of("avro", "orc", "parquet");
+
+ private static final Schema SCHEMA = new Schema(
+ required(1, "data", Types.StringType.get()),
+ required(2, "id", Types.LongType.get()),
+ required(3, "date", Types.StringType.get()));
+
+ private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
+ .identity("date")
+ .bucket("id", 1)
+ .build();
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ // before variables
+ private Configuration conf;
+ private TestHelper helper;
+ private InputFormatConfig.ConfigBuilder builder;
+
+ // parametrized variables
+ private final TestInputFormat.Factory<Record> testInputFormat;
+ private final FileFormat fileFormat;
+
+ @Before
+ public void before() throws IOException {
+ conf = new Configuration();
+ HadoopTables tables = new HadoopTables(conf);
+
+ File location = temp.newFolder(testInputFormat.name(), fileFormat.name());
+ Assert.assertTrue(location.delete());
+
+ helper = new TestHelper(conf, tables, SCHEMA, SPEC, fileFormat, temp,
location);
+ builder = new InputFormatConfig.ConfigBuilder(conf).readFrom(location);
+ }
+
+ @Parameterized.Parameters
+ public static Object[][] parameters() {
+ Object[][] parameters = new Object[TESTED_INPUT_FORMATS.size() *
TESTED_FILE_FORMATS.size()][2];
+
+ int idx = 0;
+
+ for (TestInputFormat.Factory<Record> inputFormat : TESTED_INPUT_FORMATS) {
+ for (String fileFormat : TESTED_FILE_FORMATS) {
+ parameters[idx++] = new Object[] {inputFormat, fileFormat};
+ }
+ }
+
+ return parameters;
+ }
+
+ public TestIcebergInputFormats(TestInputFormat.Factory<Record>
testInputFormat, String fileFormat) {
+ this.testInputFormat = testInputFormat;
+ this.fileFormat =
FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH));
+ }
+
+ @Test
+ public void testUnpartitionedTable() throws Exception {
+ helper.createUnpartitionedTable();
+ List<Record> expectedRecords = helper.generateRandomRecords(1, 0L);
+ helper.appendToTable(null, expectedRecords);
+
+ testInputFormat.create(builder.conf()).validate(expectedRecords);
+ }
+
+ @Test
+ public void testPartitionedTable() throws Exception {
+ helper.createPartitionedTable();
+ List<Record> expectedRecords = helper.generateRandomRecords(1, 0L);
+ expectedRecords.get(0).set(2, "2020-03-20");
+ helper.appendToTable(Row.of("2020-03-20", 0), expectedRecords);
+
+ testInputFormat.create(builder.conf()).validate(expectedRecords);
+ }
+
+ @Test
+ public void testFilterExp() throws Exception {
+ helper.createPartitionedTable();
+
+ List<Record> expectedRecords = helper.generateRandomRecords(2, 0L);
+ expectedRecords.get(0).set(2, "2020-03-20");
+ expectedRecords.get(1).set(2, "2020-03-20");
+
+ DataFile dataFile1 = helper.writeFile(Row.of("2020-03-20", 0),
expectedRecords);
+ DataFile dataFile2 = helper.writeFile(Row.of("2020-03-21", 0),
helper.generateRandomRecords(2, 0L));
+ helper.appendToTable(dataFile1, dataFile2);
+
+ builder.filter(Expressions.equal("date", "2020-03-20"));
+ testInputFormat.create(builder.conf()).validate(expectedRecords);
+ }
+
+ @Test
+ public void testResiduals() throws Exception {
+ helper.createPartitionedTable();
+
+ List<Record> writeRecords = helper.generateRandomRecords(2, 0L);
+ writeRecords.get(0).set(1, 123L);
+ writeRecords.get(0).set(2, "2020-03-20");
+ writeRecords.get(1).set(1, 456L);
+ writeRecords.get(1).set(2, "2020-03-20");
+
+ List<Record> expectedRecords = new ArrayList<>();
+ expectedRecords.add(writeRecords.get(0));
+
+ DataFile dataFile1 = helper.writeFile(Row.of("2020-03-20", 0),
writeRecords);
+ DataFile dataFile2 = helper.writeFile(Row.of("2020-03-21", 0),
helper.generateRandomRecords(2, 0L));
+ helper.appendToTable(dataFile1, dataFile2);
+
+ builder.filter(Expressions.and(
+ Expressions.equal("date", "2020-03-20"),
+ Expressions.equal("id", 123)));
+ testInputFormat.create(builder.conf()).validate(expectedRecords);
+
+ // skip residual filtering
+ builder.skipResidualFiltering();
+ testInputFormat.create(builder.conf()).validate(writeRecords);
+ }
+
+ @Test
+ public void testFailedResidualFiltering() throws Exception {
+ helper.createPartitionedTable();
+
+ List<Record> expectedRecords = helper.generateRandomRecords(2, 0L);
+ expectedRecords.get(0).set(2, "2020-03-20");
+ expectedRecords.get(1).set(2, "2020-03-20");
+
+ helper.appendToTable(Row.of("2020-03-20", 0), expectedRecords);
+
+ builder.useHiveRows()
+ .filter(Expressions.and(
+ Expressions.equal("date", "2020-03-20"),
+ Expressions.equal("id", 0)));
+
+ AssertHelpers.assertThrows(
+ "Residuals are not evaluated today for Iceberg Generics In memory
model of HIVE",
+ UnsupportedOperationException.class, "Filter expression
ref(name=\"id\") == 0 is not completely satisfied.",
+ () -> testInputFormat.create(builder.conf()));
+
+ builder.usePigTuples();
+
+ AssertHelpers.assertThrows(
+ "Residuals are not evaluated today for Iceberg Generics In memory
model of PIG",
+ UnsupportedOperationException.class, "Filter expression
ref(name=\"id\") == 0 is not completely satisfied.",
+ () -> testInputFormat.create(builder.conf()));
+ }
+
+ @Test
+ public void testProjection() throws Exception {
+ helper.createPartitionedTable();
+ List<Record> inputRecords = helper.generateRandomRecords(1, 0L);
+ helper.appendToTable(Row.of("2020-03-20", 0), inputRecords);
+
+ Schema projection = TypeUtil.select(SCHEMA, ImmutableSet.of(1));
+ builder.project(projection);
+
+ List<Record> outputRecords =
testInputFormat.create(builder.conf()).getRecords();
+
+ Assert.assertEquals(inputRecords.size(), outputRecords.size());
+ Assert.assertEquals(projection.asStruct(), outputRecords.get(0).struct());
+ }
+
+ private static final Schema LOG_SCHEMA = new Schema(
+ Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "date", Types.StringType.get()),
+ Types.NestedField.optional(3, "level", Types.StringType.get()),
+ Types.NestedField.optional(4, "message", Types.StringType.get())
+ );
+
+ private static final PartitionSpec IDENTITY_PARTITION_SPEC =
+
PartitionSpec.builderFor(LOG_SCHEMA).identity("date").identity("level").build();
+
+ @Test
+ public void testIdentityPartitionProjections() throws Exception {
+ helper.createTable(LOG_SCHEMA, IDENTITY_PARTITION_SPEC);
+ List<Record> inputRecords = helper.generateRandomRecords(10, 0L);
+
+ Integer idx = 0;
+ AppendFiles append = helper.getTable().newAppend();
+ for (Record record : inputRecords) {
+ record.set(1, "2020-03-2" + idx);
+ record.set(2, idx.toString());
+ append.appendFile(helper.writeFile(Row.of("2020-03-2" + idx,
idx.toString()), ImmutableList.of(record)));
+ idx += 1;
+ }
+ append.commit();
+
+ // individual fields
+ validateIdentityPartitionProjections(withColumns("date"), inputRecords);
+ validateIdentityPartitionProjections(withColumns("level"), inputRecords);
+ validateIdentityPartitionProjections(withColumns("message"), inputRecords);
+ validateIdentityPartitionProjections(withColumns("id"), inputRecords);
+ // field pairs
+ validateIdentityPartitionProjections(withColumns("date", "message"),
inputRecords);
+ validateIdentityPartitionProjections(withColumns("level", "message"),
inputRecords);
+ validateIdentityPartitionProjections(withColumns("date", "level"),
inputRecords);
+ // out-of-order pairs
+ validateIdentityPartitionProjections(withColumns("message", "date"),
inputRecords);
+ validateIdentityPartitionProjections(withColumns("message", "level"),
inputRecords);
+ validateIdentityPartitionProjections(withColumns("level", "date"),
inputRecords);
+ // full projection
+ validateIdentityPartitionProjections(LOG_SCHEMA, inputRecords);
+ // out-of-order triplets
+ validateIdentityPartitionProjections(withColumns("date", "level",
"message"), inputRecords);
+ validateIdentityPartitionProjections(withColumns("level", "date",
"message"), inputRecords);
+ validateIdentityPartitionProjections(withColumns("date", "message",
"level"), inputRecords);
+ validateIdentityPartitionProjections(withColumns("level", "message",
"date"), inputRecords);
+ validateIdentityPartitionProjections(withColumns("message", "date",
"level"), inputRecords);
+ validateIdentityPartitionProjections(withColumns("message", "level",
"date"), inputRecords);
+ }
+
+ private static Schema withColumns(String... names) {
+ Map<String, Integer> indexByName =
TypeUtil.indexByName(LOG_SCHEMA.asStruct());
+ Set<Integer> projectedIds = Sets.newHashSet();
+ for (String name : names) {
+ projectedIds.add(indexByName.get(name));
+ }
+ return TypeUtil.select(LOG_SCHEMA, projectedIds);
+ }
+
+ private void validateIdentityPartitionProjections(Schema projectedSchema,
List<Record> inputRecords) {
+ builder.project(projectedSchema);
+ List<Record> actualRecords =
testInputFormat.create(builder.conf()).getRecords();
+
+ Set<String> fieldNames =
TypeUtil.indexByName(projectedSchema.asStruct()).keySet();
+
+ for (int pos = 0; pos < inputRecords.size(); pos++) {
+ Record inputRecord = inputRecords.get(pos);
+ Record actualRecord = actualRecords.get(pos);
+ Assert.assertEquals("Projected schema should match",
projectedSchema.asStruct(), actualRecord.struct());
+
+ for (String name : fieldNames) {
+ Assert.assertEquals(
+ "Projected field " + name + " should match",
inputRecord.getField(name), actualRecord.getField(name));
+ }
+ }
+ }
+
+ @Test
+ public void testSnapshotReads() throws Exception {
+ helper.createUnpartitionedTable();
+
+ List<Record> expectedRecords = helper.generateRandomRecords(1, 0L);
+ helper.appendToTable(null, expectedRecords);
+ long snapshotId = helper.getTable().currentSnapshot().snapshotId();
+
+ helper.appendToTable(null, helper.generateRandomRecords(1, 0L));
+
+ builder.snapshotId(snapshotId);
+ testInputFormat.create(builder.conf()).validate(expectedRecords);
+ }
+
+ @Test
+ public void testLocality() throws Exception {
+ helper.createUnpartitionedTable();
+ List<Record> expectedRecords = helper.generateRandomRecords(1, 0L);
+ helper.appendToTable(null, expectedRecords);
+
+ for (InputSplit split :
testInputFormat.create(builder.conf()).getSplits()) {
+ Assert.assertArrayEquals(IcebergSplit.ANYWHERE, split.getLocations());
+ }
+
+ builder.preferLocality();
+
+ for (InputSplit split :
testInputFormat.create(builder.conf()).getSplits()) {
+ Assert.assertArrayEquals(new String[]{"localhost"},
split.getLocations());
+ }
+ }
+
+ public static class HadoopCatalogLoader implements Function<Configuration,
Catalog> {
+ @Override
+ public Catalog apply(Configuration conf) {
+ return new HadoopCatalog(conf, conf.get("warehouse.location"));
+ }
+ }
+
+ @Test
+ public void testCustomCatalog() throws IOException {
+ conf.set("warehouse.location",
temp.newFolder("hadoop_catalog").getAbsolutePath());
+
+ Catalog catalog = new HadoopCatalogLoader().apply(conf);
+ TableIdentifier identifier = TableIdentifier.of("db", "t");
+ helper.createTable(catalog, identifier);
+
+ List<Record> expectedRecords = helper.generateRandomRecords(1, 0L);
+ expectedRecords.get(0).set(2, "2020-03-20");
+ helper.appendToTable(Row.of("2020-03-20", 0), expectedRecords);
+
+ builder.catalogFunc(HadoopCatalogLoader.class)
+ .readFrom(identifier);
+
+ testInputFormat.create(builder.conf()).validate(expectedRecords);
+ }
+
+ private abstract static class TestInputFormat<T> {
+
+ private final List<IcebergSplit> splits;
+ private final List<T> records;
+
+ private TestInputFormat(List<IcebergSplit> splits, List<T> records) {
+ this.splits = splits;
+ this.records = records;
+ }
+
+ public List<T> getRecords() {
+ return records;
+ }
+
+ public List<IcebergSplit> getSplits() {
+ return splits;
+ }
+
+ public void validate(List<T> expected) {
+ Assert.assertEquals(expected, records);
+ }
+
+ public interface Factory<T> {
+ String name();
+ TestInputFormat<T> create(Configuration conf);
+ }
+
+ public static <T> Factory<T> newFactory(String name,
Function<Configuration, TestInputFormat<T>> function) {
+ return new Factory<T>() {
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public TestInputFormat<T> create(Configuration conf) {
+ return function.apply(conf);
+ }
+ };
+ }
+ }
+
+ private static final class TestMapredIcebergInputFormat<T> extends
TestInputFormat<T> {
+
+ private TestMapredIcebergInputFormat(List<IcebergSplit> splits, List<T>
records) {
+ super(splits, records);
+ }
+
+ private static <T> TestMapredIcebergInputFormat<T> create(Configuration
conf) {
+ JobConf job = new JobConf(conf);
+ MapredIcebergInputFormat<T> inputFormat = new
MapredIcebergInputFormat<>();
+
+ try {
+ org.apache.hadoop.mapred.InputSplit[] splits =
inputFormat.getSplits(job, 1);
+
+ List<IcebergSplit> iceSplits = new ArrayList<>(splits.length);
+ List<T> records = new ArrayList<>();
+
+ for (org.apache.hadoop.mapred.InputSplit split : splits) {
+ iceSplits.add((IcebergSplit) split);
+ org.apache.hadoop.mapred.RecordReader<Void, Container<T>>
+ reader = inputFormat.getRecordReader(split, job,
Reporter.NULL);
+
+ try {
+ Container<T> container = reader.createValue();
+
+ while (reader.next(null, container)) {
+ records.add(container.get());
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ return new TestMapredIcebergInputFormat<>(iceSplits, records);
+ } catch (IOException ioe) {
+ throw new UncheckedIOException(ioe);
+ }
+ }
+ }
+
+ private static final class TestIcebergInputFormat<T> extends
TestInputFormat<T> {
+
+ private TestIcebergInputFormat(List<IcebergSplit> splits, List<T> records)
{
+ super(splits, records);
+ }
+
+ private static <T> TestIcebergInputFormat<T> create(Configuration conf) {
+ TaskAttemptContext context = new TaskAttemptContextImpl(conf, new
TaskAttemptID());
+ IcebergInputFormat<T> inputFormat = new IcebergInputFormat<>();
+ List<InputSplit> splits = inputFormat.getSplits(context);
+
+ List<IcebergSplit> iceSplits = new ArrayList<>(splits.size());
+ List<T> records = new ArrayList<>();
+
+ for (InputSplit split : splits) {
+ iceSplits.add((IcebergSplit) split);
+
+ try (RecordReader<Void, T> reader =
inputFormat.createRecordReader(split, context)) {
+ reader.initialize(split, context);
+
+ while (reader.nextKeyValue()) {
+ records.add(reader.getCurrentValue());
+ }
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ } catch (IOException ioe) {
+ throw new UncheckedIOException(ioe);
+ }
+ }
+
+ return new TestIcebergInputFormat<>(iceSplits, records);
+ }
+ }
+}
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInputFormat.java
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInputFormat.java
new file mode 100644
index 0000000..6443d45
--- /dev/null
+++
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInputFormat.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.StandaloneHiveRunner;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.mr.mapred.IcebergSerDe;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+@RunWith(StandaloneHiveRunner.class)
+public class TestHiveIcebergInputFormat {
+
+ @HiveSQL(files = {}, autoStart = true)
+ private HiveShell shell;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ private static final Schema CUSTOMER_SCHEMA = new Schema(
+ required(1, "customer_id", Types.LongType.get()),
+ required(2, "first_name", Types.StringType.get())
+ );
+
+ private static final List<Record> CUSTOMER_RECORDS =
TestHelper.RecordsBuilder.newInstance(CUSTOMER_SCHEMA)
+ .add(0L, "Alice")
+ .add(1L, "Bob")
+ .add(2L, "Trudy")
+ .build();
+
+ private static final Schema ORDER_SCHEMA = new Schema(
+ required(1, "order_id", Types.LongType.get()),
+ required(2, "customer_id", Types.LongType.get()),
+ required(3, "total", Types.DoubleType.get()));
+
+ private static final List<Record> ORDER_RECORDS =
TestHelper.RecordsBuilder.newInstance(ORDER_SCHEMA)
+ .add(100L, 0L, 11.11d)
+ .add(101L, 0L, 22.22d)
+ .add(102L, 1L, 33.33d)
+ .build();
+
+ // before variables
+ private HadoopTables tables;
+ private Table customerTable;
+ private Table orderTable;
+
+ @Before
+ public void before() throws IOException {
+ Configuration conf = new Configuration();
+ tables = new HadoopTables(conf);
+
+ File customerLocation = temp.newFolder("customers");
+ Assert.assertTrue(customerLocation.delete());
+
+ TestHelper customerHelper = new TestHelper(
+ conf, tables, CUSTOMER_SCHEMA, PartitionSpec.unpartitioned(),
FileFormat.PARQUET, temp, customerLocation);
+
+ customerTable = customerHelper.createUnpartitionedTable();
+ customerHelper.appendToTable(customerHelper.writeFile(null,
CUSTOMER_RECORDS));
+
+ File orderLocation = temp.newFolder("orders");
+ Assert.assertTrue(orderLocation.delete());
+
+ TestHelper orderHelper = new TestHelper(
+ conf, tables, ORDER_SCHEMA, PartitionSpec.unpartitioned(),
FileFormat.PARQUET, temp, orderLocation);
+
+ orderTable = orderHelper.createUnpartitionedTable();
+ orderHelper.appendToTable(orderHelper.writeFile(null, ORDER_RECORDS));
+ }
+
+ @Test
+ public void testScanEmptyTable() throws IOException {
+ File emptyLocation = temp.newFolder("empty");
+ Assert.assertTrue(emptyLocation.delete());
+
+ Schema emptySchema = new Schema(required(1, "empty",
Types.StringType.get()));
+ Table emptyTable = tables.create(
+ emptySchema, PartitionSpec.unpartitioned(),
Collections.emptyMap(), emptyLocation.toString());
+ createHiveTable("empty", emptyTable.location());
+
+ List<Object[]> rows = shell.executeStatement("SELECT * FROM
default.empty");
+ Assert.assertEquals(0, rows.size());
+ }
+
+ @Test
+ public void testScanTable() {
+ createHiveTable("customers", customerTable.location());
+
+ // Single fetch task: no MR job.
+ List<Object[]> rows = shell.executeStatement("SELECT * FROM
default.customers");
+
+ Assert.assertEquals(3, rows.size());
+ Assert.assertArrayEquals(new Object[] {0L, "Alice"}, rows.get(0));
+ Assert.assertArrayEquals(new Object[] {1L, "Bob"}, rows.get(1));
+ Assert.assertArrayEquals(new Object[] {2L, "Trudy"}, rows.get(2));
+
+ // Adding the ORDER BY clause will cause Hive to spawn a local MR job this
time.
+ List<Object[]> descRows = shell.executeStatement("SELECT * FROM
default.customers ORDER BY customer_id DESC");
+
+ Assert.assertEquals(3, rows.size());
+ Assert.assertArrayEquals(new Object[] {2L, "Trudy"}, descRows.get(0));
+ Assert.assertArrayEquals(new Object[] {1L, "Bob"}, descRows.get(1));
+ Assert.assertArrayEquals(new Object[] {0L, "Alice"}, descRows.get(2));
+ }
+
+ @Test
+ public void testJoinTables() {
+ createHiveTable("customers", customerTable.location());
+ createHiveTable("orders", orderTable.location());
+
+ List<Object[]> rows = shell.executeStatement(
+ "SELECT c.customer_id, c.first_name, o.order_id, o.total " +
+ "FROM default.customers c JOIN default.orders o ON
c.customer_id = o.customer_id " +
+ "ORDER BY c.customer_id, o.order_id"
+ );
+
+ Assert.assertArrayEquals(new Object[] {0L, "Alice", 100L, 11.11d},
rows.get(0));
+ Assert.assertArrayEquals(new Object[] {0L, "Alice", 101L, 22.22d},
rows.get(1));
+ Assert.assertArrayEquals(new Object[] {1L, "Bob", 102L, 33.33d},
rows.get(2));
+ }
+
+ private void createHiveTable(String table, String location) {
+ shell.execute(String.format(
+ "CREATE TABLE default.%s " +
+ "ROW FORMAT SERDE '%s' " +
+ "STORED AS " +
+ "INPUTFORMAT '%s' " +
+ "OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' " +
+ "LOCATION '%s'",
+ table, IcebergSerDe.class.getName(),
HiveIcebergInputFormat.class.getName(), location));
+ }
+}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestHelpers.java
b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestHelpers.java
deleted file mode 100644
index 0d8cdc0..0000000
--- a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestHelpers.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.iceberg.mr.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DataFiles;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Files;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.data.avro.DataWriter;
-import org.apache.iceberg.data.orc.GenericOrcWriter;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.types.Types;
-
-import static org.apache.iceberg.types.Types.NestedField.optional;
-import static org.apache.iceberg.types.Types.NestedField.required;
-
-public class TestHelpers {
-
- private TestHelpers() {}
-
- public static DataFile writeFile(File targetFile, Table table, StructLike
partitionData, FileFormat fileFormat,
- List<Record> records) throws IOException {
- if (targetFile.exists()) {
- if (!targetFile.delete()) {
- throw new IOException("Unable to delete " +
targetFile.getAbsolutePath());
- }
- }
- FileAppender<Record> appender;
- switch (fileFormat) {
- case AVRO:
- appender = Avro.write(Files.localOutput(targetFile))
- .schema(table.schema())
- .createWriterFunc(DataWriter::create)
- .named(fileFormat.name())
- .build();
- break;
- case PARQUET:
- appender = Parquet.write(Files.localOutput(targetFile))
- .schema(table.schema())
- .createWriterFunc(GenericParquetWriter::buildWriter)
- .named(fileFormat.name())
- .build();
- break;
- case ORC:
- appender = ORC.write(Files.localOutput(targetFile))
- .schema(table.schema())
- .createWriterFunc(GenericOrcWriter::buildWriter)
- .build();
- break;
- default:
- throw new UnsupportedOperationException("Cannot write format: " +
fileFormat);
- }
-
- try {
- appender.addAll(records);
- } finally {
- appender.close();
- }
-
- DataFiles.Builder builder = DataFiles.builder(table.spec())
- .withPath(targetFile.toString())
- .withFormat(fileFormat)
- .withFileSizeInBytes(targetFile.length())
- .withMetrics(appender.metrics());
- if (partitionData != null) {
- builder.withPartition(partitionData);
- }
- return builder.build();
- }
-
- public static Record createSimpleRecord(long id, String data) {
- Schema schema = new Schema(required(1, "id", Types.StringType.get()),
- optional(2, "data", Types.LongType.get()));
- GenericRecord record = GenericRecord.create(schema);
- record.setField("id", id);
- record.setField("data", data);
- return record;
- }
-
- public static Record createCustomRecord(Schema schema, Object... dataValues)
{
- GenericRecord record = GenericRecord.create(schema);
- List<Types.NestedField> fields = schema.columns();
- for (int i = 0; i < fields.size(); i++) {
- record.set(i, dataValues[i]);
- }
- return record;
- }
-
-}
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java
b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java
index 937ddba..a2fb90d 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java
@@ -72,9 +72,10 @@ public class TestIcebergSerDe {
IcebergSerDe serDe = new IcebergSerDe();
Record record = RandomGenericData.generate(schema, 1, 0).get(0);
- IcebergWritable writable = new IcebergWritable(record, schema);
+ Container<Record> container = new Container<>();
+ container.set(record);
- Assert.assertEquals(record, serDe.deserialize(writable));
+ Assert.assertEquals(record, serDe.deserialize(container));
}
}
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
deleted file mode 100644
index 05842f8..0000000
---
a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
+++ /dev/null
@@ -1,557 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.mr.mapreduce;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.AssertHelpers;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DataFiles;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Files;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.TestHelpers.Row;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.data.RandomGenericData;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.data.avro.DataWriter;
-import org.apache.iceberg.data.orc.GenericOrcWriter;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.hadoop.HadoopCatalog;
-import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.mr.InputFormatConfig;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import static org.apache.iceberg.types.Types.NestedField.required;
-
-@RunWith(Parameterized.class)
-public class TestIcebergInputFormat {
- static final Schema SCHEMA = new Schema(
- required(1, "data", Types.StringType.get()),
- required(2, "id", Types.LongType.get()),
- required(3, "date", Types.StringType.get()));
-
- static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
- .identity("date")
- .bucket("id", 1)
- .build();
-
- @Rule
- public TemporaryFolder temp = new TemporaryFolder();
- private HadoopTables tables;
- private Configuration conf;
-
- @Parameterized.Parameters
- public static Object[][] parameters() {
- return new Object[][]{
- new Object[]{"parquet"},
- new Object[]{"avro"},
- new Object[]{"orc"}
- };
- }
-
- private final FileFormat format;
-
- public TestIcebergInputFormat(String format) {
- this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
- }
-
- @Before
- public void before() {
- conf = new Configuration();
- tables = new HadoopTables(conf);
- }
-
- @Test
- public void testUnpartitionedTable() throws Exception {
- File location = temp.newFolder(format.name());
- Assert.assertTrue(location.delete());
- Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
-
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
- location.toString());
- List<Record> expectedRecords = RandomGenericData.generate(table.schema(),
1, 0L);
- DataFile dataFile = writeFile(table, null, format, expectedRecords);
- table.newAppend()
- .appendFile(dataFile)
- .commit();
- Job job = Job.getInstance(conf);
- InputFormatConfig.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
- configBuilder.readFrom(location.toString()).schema(table.schema());
- validate(job, expectedRecords);
- }
-
- @Test
- public void testPartitionedTable() throws Exception {
- File location = temp.newFolder(format.name());
- Assert.assertTrue(location.delete());
- Table table = tables.create(SCHEMA, SPEC,
-
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
- location.toString());
- List<Record> expectedRecords = RandomGenericData.generate(table.schema(),
1, 0L);
- expectedRecords.get(0).set(2, "2020-03-20");
- DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format,
expectedRecords);
- table.newAppend()
- .appendFile(dataFile)
- .commit();
-
- Job job = Job.getInstance(conf);
- InputFormatConfig.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
- configBuilder.readFrom(location.toString()).schema(table.schema());
- validate(job, expectedRecords);
- }
-
- @Test
- public void testFilterExp() throws Exception {
- File location = temp.newFolder(format.name());
- Assert.assertTrue(location.delete());
- Table table = tables.create(SCHEMA, SPEC,
-
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
- location.toString());
- List<Record> expectedRecords = RandomGenericData.generate(table.schema(),
2, 0L);
- expectedRecords.get(0).set(2, "2020-03-20");
- expectedRecords.get(1).set(2, "2020-03-20");
- DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format,
expectedRecords);
- DataFile dataFile2 = writeFile(table, Row.of("2020-03-21", 0), format,
- RandomGenericData.generate(table.schema(),
2, 0L));
- table.newAppend()
- .appendFile(dataFile1)
- .appendFile(dataFile2)
- .commit();
- Job job = Job.getInstance(conf);
- InputFormatConfig.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
- configBuilder.readFrom(location.toString())
- .schema(table.schema())
- .filter(Expressions.equal("date", "2020-03-20"));
- validate(job, expectedRecords);
- }
-
- @Test
- public void testResiduals() throws Exception {
- File location = temp.newFolder(format.name());
- Assert.assertTrue(location.delete());
- Table table = tables.create(SCHEMA, SPEC,
-
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
- location.toString());
- List<Record> writeRecords = RandomGenericData.generate(table.schema(), 2,
0L);
- writeRecords.get(0).set(1, 123L);
- writeRecords.get(0).set(2, "2020-03-20");
- writeRecords.get(1).set(1, 456L);
- writeRecords.get(1).set(2, "2020-03-20");
-
- List<Record> expectedRecords = new ArrayList<>();
- expectedRecords.add(writeRecords.get(0));
-
- DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format,
writeRecords);
- DataFile dataFile2 = writeFile(table, Row.of("2020-03-21", 0), format,
- RandomGenericData.generate(table.schema(), 2, 0L));
- table.newAppend()
- .appendFile(dataFile1)
- .appendFile(dataFile2)
- .commit();
- Job job = Job.getInstance(conf);
- InputFormatConfig.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
- configBuilder.readFrom(location.toString())
- .schema(table.schema())
- .filter(Expressions.and(
- Expressions.equal("date", "2020-03-20"),
- Expressions.equal("id", 123)));
- validate(job, expectedRecords);
-
- // skip residual filtering
- job = Job.getInstance(conf);
- configBuilder = IcebergInputFormat.configure(job);
- configBuilder.skipResidualFiltering().readFrom(location.toString())
- .schema(table.schema())
- .filter(Expressions.and(
- Expressions.equal("date", "2020-03-20"),
- Expressions.equal("id", 123)));
- validate(job, writeRecords);
- }
-
- @Test
- public void testFailedResidualFiltering() throws Exception {
- File location = temp.newFolder(format.name());
- Assert.assertTrue(location.delete());
- Table table = tables.create(SCHEMA, SPEC,
- ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
- location.toString());
- List<Record> expectedRecords = RandomGenericData.generate(table.schema(),
2, 0L);
- expectedRecords.get(0).set(2, "2020-03-20");
- expectedRecords.get(1).set(2, "2020-03-20");
-
- DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format,
expectedRecords);
- table.newAppend()
- .appendFile(dataFile1)
- .commit();
-
- Job jobShouldFail1 = Job.getInstance(conf);
- InputFormatConfig.ConfigBuilder configBuilder =
IcebergInputFormat.configure(jobShouldFail1);
- configBuilder.useHiveRows().readFrom(location.toString())
- .schema(table.schema())
- .filter(Expressions.and(
- Expressions.equal("date", "2020-03-20"),
- Expressions.equal("id", 0)));
- AssertHelpers.assertThrows(
- "Residuals are not evaluated today for Iceberg Generics In memory
model of HIVE",
- UnsupportedOperationException.class, "Filter expression
ref(name=\"id\") == 0 is not completely satisfied.",
- () -> validate(jobShouldFail1, expectedRecords));
-
- Job jobShouldFail2 = Job.getInstance(conf);
- configBuilder = IcebergInputFormat.configure(jobShouldFail2);
- configBuilder.usePigTuples().readFrom(location.toString())
- .schema(table.schema())
- .filter(Expressions.and(
- Expressions.equal("date", "2020-03-20"),
- Expressions.equal("id", 0)));
- AssertHelpers.assertThrows(
- "Residuals are not evaluated today for Iceberg Generics In memory
model of PIG",
- UnsupportedOperationException.class, "Filter expression
ref(name=\"id\") == 0 is not completely satisfied.",
- () -> validate(jobShouldFail2, expectedRecords));
- }
-
- @Test
- public void testProjection() throws Exception {
- File location = temp.newFolder(format.name());
- Assert.assertTrue(location.delete());
- Schema projectedSchema = TypeUtil.select(SCHEMA, ImmutableSet.of(1));
- Table table = tables.create(SCHEMA, SPEC,
-
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
- location.toString());
- List<Record> inputRecords = RandomGenericData.generate(table.schema(), 1,
0L);
- DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format,
inputRecords);
- table.newAppend()
- .appendFile(dataFile)
- .commit();
-
- Job job = Job.getInstance(conf);
- InputFormatConfig.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
- configBuilder
- .schema(table.schema())
- .readFrom(location.toString())
- .project(projectedSchema);
- List<Record> outputRecords = readRecords(job.getConfiguration());
- Assert.assertEquals(inputRecords.size(), outputRecords.size());
- Assert.assertEquals(projectedSchema.asStruct(),
outputRecords.get(0).struct());
- }
-
- private static final Schema LOG_SCHEMA = new Schema(
- Types.NestedField.optional(1, "id", Types.IntegerType.get()),
- Types.NestedField.optional(2, "date", Types.StringType.get()),
- Types.NestedField.optional(3, "level", Types.StringType.get()),
- Types.NestedField.optional(4, "message", Types.StringType.get())
- );
-
- private static final PartitionSpec IDENTITY_PARTITION_SPEC =
-
PartitionSpec.builderFor(LOG_SCHEMA).identity("date").identity("level").build();
-
- @Test
- public void testIdentityPartitionProjections() throws Exception {
- File location = temp.newFolder(format.name());
- Assert.assertTrue(location.delete());
- Table table = tables.create(LOG_SCHEMA, IDENTITY_PARTITION_SPEC,
-
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
- location.toString());
-
- List<Record> inputRecords = RandomGenericData.generate(LOG_SCHEMA, 10, 0);
- Integer idx = 0;
- AppendFiles append = table.newAppend();
- for (Record record : inputRecords) {
- record.set(1, "2020-03-2" + idx);
- record.set(2, idx.toString());
- append.appendFile(writeFile(table, Row.of("2020-03-2" + idx,
idx.toString()), format, ImmutableList.of(record)));
- idx += 1;
- }
- append.commit();
-
- // individual fields
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("date"), inputRecords);
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("level"), inputRecords);
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("message"), inputRecords);
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("id"), inputRecords);
- // field pairs
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("date", "message"), inputRecords);
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("level", "message"), inputRecords);
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("date", "level"), inputRecords);
- // out-of-order pairs
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("message", "date"), inputRecords);
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("message", "level"), inputRecords);
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("level", "date"), inputRecords);
- // full projection
- validateIdentityPartitionProjections(location.toString(), table.schema(),
LOG_SCHEMA, inputRecords);
- // out-of-order triplets
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("date", "level", "message"), inputRecords);
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("level", "date", "message"), inputRecords);
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("date", "message", "level"), inputRecords);
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("level", "message", "date"), inputRecords);
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("message", "date", "level"), inputRecords);
- validateIdentityPartitionProjections(location.toString(), table.schema(),
- withColumns("message", "level", "date"), inputRecords);
- }
-
- private static Schema withColumns(String... names) {
- Map<String, Integer> indexByName =
TypeUtil.indexByName(LOG_SCHEMA.asStruct());
- Set<Integer> projectedIds = Sets.newHashSet();
- for (String name : names) {
- projectedIds.add(indexByName.get(name));
- }
- return TypeUtil.select(LOG_SCHEMA, projectedIds);
- }
-
- private void validateIdentityPartitionProjections(
- String tablePath, Schema tableSchema, Schema projectedSchema,
List<Record> inputRecords) throws Exception {
- Job job = Job.getInstance(conf);
- InputFormatConfig.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
- configBuilder
- .schema(tableSchema)
- .readFrom(tablePath)
- .project(projectedSchema);
- List<Record> actualRecords = readRecords(job.getConfiguration());
-
- Set<String> fieldNames =
TypeUtil.indexByName(projectedSchema.asStruct()).keySet();
- for (int pos = 0; pos < inputRecords.size(); pos++) {
- Record inputRecord = inputRecords.get(pos);
- Record actualRecord = actualRecords.get(pos);
- Assert.assertEquals("Projected schema should match",
projectedSchema.asStruct(), actualRecord.struct());
- for (String name : fieldNames) {
- Assert.assertEquals(
- "Projected field " + name + " should match",
inputRecord.getField(name), actualRecord.getField(name));
- }
- }
- }
-
- @Test
- public void testSnapshotReads() throws Exception {
- File location = temp.newFolder(format.name());
- Assert.assertTrue(location.delete());
- Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
-
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
- location.toString());
- List<Record> expectedRecords = RandomGenericData.generate(table.schema(),
1, 0L);
- table.newAppend()
- .appendFile(writeFile(table, null, format, expectedRecords))
- .commit();
- long snapshotId = table.currentSnapshot().snapshotId();
- table.newAppend()
- .appendFile(writeFile(table, null, format,
RandomGenericData.generate(table.schema(), 1, 0L)))
- .commit();
-
- Job job = Job.getInstance(conf);
- InputFormatConfig.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
- configBuilder
- .schema(table.schema())
- .readFrom(location.toString())
- .snapshotId(snapshotId);
-
- validate(job, expectedRecords);
- }
-
- @Test
- public void testLocality() throws Exception {
- File location = temp.newFolder(format.name());
- Assert.assertTrue(location.delete());
- Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
-
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
- location.toString());
- List<Record> expectedRecords = RandomGenericData.generate(table.schema(),
1, 0L);
- table.newAppend()
- .appendFile(writeFile(table, null, format, expectedRecords))
- .commit();
- Job job = Job.getInstance(conf);
- InputFormatConfig.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
- configBuilder.readFrom(location.toString()).schema(table.schema());
-
- for (InputSplit split : splits(job.getConfiguration())) {
- Assert.assertArrayEquals(IcebergInputFormat.IcebergSplit.ANYWHERE,
split.getLocations());
- }
-
- configBuilder.preferLocality();
- for (InputSplit split : splits(job.getConfiguration())) {
- Assert.assertArrayEquals(new String[]{"localhost"},
split.getLocations());
- }
- }
-
- public static class HadoopCatalogFunc implements Function<Configuration,
Catalog> {
- @Override
- public Catalog apply(Configuration conf) {
- return new HadoopCatalog(conf, conf.get("warehouse.location"));
- }
- }
-
- @Test
- public void testCustomCatalog() throws Exception {
- conf = new Configuration();
- conf.set("warehouse.location",
temp.newFolder("hadoop_catalog").getAbsolutePath());
-
- Catalog catalog = new HadoopCatalogFunc().apply(conf);
- TableIdentifier tableIdentifier = TableIdentifier.of("db", "t");
- Table table = catalog.createTable(tableIdentifier, SCHEMA, SPEC,
-
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()));
- List<Record> expectedRecords = RandomGenericData.generate(table.schema(),
1, 0L);
- expectedRecords.get(0).set(2, "2020-03-20");
- DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format,
expectedRecords);
- table.newAppend()
- .appendFile(dataFile)
- .commit();
-
- Job job = Job.getInstance(conf);
- InputFormatConfig.ConfigBuilder configBuilder =
IcebergInputFormat.configure(job);
- configBuilder
- .catalogFunc(HadoopCatalogFunc.class)
- .schema(table.schema())
- .readFrom(tableIdentifier.toString());
- validate(job, expectedRecords);
- }
-
- private static void validate(Job job, List<Record> expectedRecords) {
- List<Record> actualRecords = readRecords(job.getConfiguration());
- Assert.assertEquals(expectedRecords, actualRecords);
- }
-
- private static <T> List<InputSplit> splits(Configuration conf) {
- TaskAttemptContext context = new TaskAttemptContextImpl(conf, new
TaskAttemptID());
- IcebergInputFormat<T> icebergInputFormat = new IcebergInputFormat<>();
- return icebergInputFormat.getSplits(context);
- }
-
- private static <T> List<T> readRecords(Configuration conf) {
- TaskAttemptContext context = new TaskAttemptContextImpl(conf, new
TaskAttemptID());
- IcebergInputFormat<T> icebergInputFormat = new IcebergInputFormat<>();
- List<InputSplit> splits = icebergInputFormat.getSplits(context);
- return
- FluentIterable
- .from(splits)
- .transformAndConcat(split -> readRecords(icebergInputFormat,
split, context))
- .toList();
- }
-
- private static <T> Iterable<T> readRecords(
- IcebergInputFormat<T> inputFormat, InputSplit split, TaskAttemptContext
context) {
- RecordReader<Void, T> recordReader = inputFormat.createRecordReader(split,
context);
- List<T> records = new ArrayList<>();
- try {
- recordReader.initialize(split, context);
- while (recordReader.nextKeyValue()) {
- records.add(recordReader.getCurrentValue());
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return records;
- }
-
- private DataFile writeFile(
- Table table, StructLike partitionData, FileFormat fileFormat,
List<Record> records) throws IOException {
- File file = temp.newFile();
- Assert.assertTrue(file.delete());
- FileAppender<Record> appender;
- switch (fileFormat) {
- case AVRO:
- appender = Avro.write(Files.localOutput(file))
- .schema(table.schema())
- .createWriterFunc(DataWriter::create)
- .named(fileFormat.name())
- .build();
- break;
- case PARQUET:
- appender = Parquet.write(Files.localOutput(file))
- .schema(table.schema())
- .createWriterFunc(GenericParquetWriter::buildWriter)
- .named(fileFormat.name())
- .build();
- break;
- case ORC:
- appender = ORC.write(Files.localOutput(file))
- .schema(table.schema())
- .createWriterFunc(GenericOrcWriter::buildWriter)
- .build();
- break;
- default:
- throw new UnsupportedOperationException("Cannot write format: " +
fileFormat);
- }
-
- try {
- appender.addAll(records);
- } finally {
- appender.close();
- }
-
- DataFiles.Builder builder = DataFiles.builder(table.spec())
- .withPath(file.toString())
- .withFormat(format)
- .withFileSizeInBytes(file.length())
- .withMetrics(appender.metrics());
- if (partitionData != null) {
- builder.withPartition(partitionData);
- }
- return builder.build();
- }
-}