This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f9f5bcf Flink: Add a rewrite datafile action for flink (#1623)
f9f5bcf is described below
commit f9f5bcfe66be2c808a4f14f59c195f92c31de8fa
Author: JunZhang <[email protected]>
AuthorDate: Wed Nov 4 19:34:10 2020 +0800
Flink: Add a rewrite datafile action for flink (#1623)
---
.../java/org/apache/iceberg/util/PropertyUtil.java | 9 +
.../Actions.java} | 49 ++--
.../flink/actions/RewriteDataFilesAction.java | 68 +++++
.../iceberg/flink/sink/TaskWriterFactory.java | 2 +-
.../iceberg/flink/source/RowDataRewriter.java | 155 +++++++++++
.../org/apache/iceberg/flink/FlinkTestBase.java | 2 +-
.../flink/actions/TestRewriteDataFilesAction.java | 283 +++++++++++++++++++++
.../iceberg/actions/RewriteDataFilesAction.java | 6 +-
8 files changed, 547 insertions(+), 27 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
index 5a5fd6f..2df88c0 100644
--- a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
@@ -52,4 +52,13 @@ public class PropertyUtil {
}
return defaultValue;
}
+
+ public static String propertyAsString(Map<String, String> properties,
+ String property, String defaultValue) {
+ String value = properties.get(property);
+ if (value != null) {
+ return properties.get(property);
+ }
+ return defaultValue;
+ }
}
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
b/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
similarity index 52%
copy from
flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
copy to flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
index 6ed7696..3fc2bb7 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
@@ -17,30 +17,31 @@
* under the License.
*/
-package org.apache.iceberg.flink.sink;
+package org.apache.iceberg.flink.actions;
-import java.io.Serializable;
-import org.apache.iceberg.io.TaskWriter;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.iceberg.Table;
+
+public class Actions {
+
+ private StreamExecutionEnvironment env;
+ private Table table;
+
+ private Actions(StreamExecutionEnvironment env, Table table) {
+ this.env = env;
+ this.table = table;
+ }
+
+ public static Actions forTable(StreamExecutionEnvironment env, Table table) {
+ return new Actions(env, table);
+ }
+
+ public static Actions forTable(Table table) {
+ return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(),
table);
+ }
+
+ public RewriteDataFilesAction rewriteDataFiles() {
+ return new RewriteDataFilesAction(env, table);
+ }
-/**
- * Factory to create {@link TaskWriter}
- *
- * @param <T> data type of record.
- */
-interface TaskWriterFactory<T> extends Serializable {
-
- /**
- * Initialize the factory with a given taskId and attemptId.
- *
- * @param taskId the identifier of task.
- * @param attemptId the attempt id of this task.
- */
- void initialize(int taskId, int attemptId);
-
- /**
- * Initialize a {@link TaskWriter} with given task id and attempt id.
- *
- * @return a newly created task writer.
- */
- TaskWriter<T> create();
}
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
b/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
new file mode 100644
index 0000000..12a8000
--- /dev/null
+++
b/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.actions;
+
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseRewriteDataFilesAction;
+import org.apache.iceberg.flink.source.RowDataRewriter;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class RewriteDataFilesAction extends
BaseRewriteDataFilesAction<RewriteDataFilesAction> {
+
+ private StreamExecutionEnvironment env;
+ private int maxParallelism;
+
+ public RewriteDataFilesAction(StreamExecutionEnvironment env, Table table) {
+ super(table);
+ this.env = env;
+ this.maxParallelism = env.getParallelism();
+ }
+
+ @Override
+ protected FileIO fileIO() {
+ return table().io();
+ }
+
+ @Override
+ protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask>
combinedScanTasks) {
+ int size = combinedScanTasks.size();
+ int parallelism = Math.min(size, maxParallelism);
+ DataStream<CombinedScanTask> dataStream =
env.fromCollection(combinedScanTasks);
+ RowDataRewriter rowDataRewriter = new RowDataRewriter(table(),
caseSensitive(), fileIO(), encryptionManager());
+ List<DataFile> addedDataFiles =
rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);
+ return addedDataFiles;
+ }
+
+ @Override
+ protected RewriteDataFilesAction self() {
+ return this;
+ }
+
+ public void maxParallelism(int parallelism) {
+ Preconditions.checkArgument(parallelism > 0, "Invalid max parallelism %d",
parallelism);
+ this.maxParallelism = parallelism;
+ }
+}
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
b/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
index 6ed7696..9d56ec6 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
@@ -27,7 +27,7 @@ import org.apache.iceberg.io.TaskWriter;
*
* @param <T> data type of record.
*/
-interface TaskWriterFactory<T> extends Serializable {
+public interface TaskWriterFactory<T> extends Serializable {
/**
* Initialize the factory with a given taskId and attemptId.
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
new file mode 100644
index 0000000..65a3ca9
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class RowDataRewriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RowDataRewriter.class);
+
+ private final Schema schema;
+ private final FileFormat format;
+ private final String nameMapping;
+ private final FileIO io;
+ private final boolean caseSensitive;
+ private final EncryptionManager encryptionManager;
+ private final TaskWriterFactory<RowData> taskWriterFactory;
+
+ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io,
EncryptionManager encryptionManager) {
+ this.schema = table.schema();
+ this.caseSensitive = caseSensitive;
+ this.io = io;
+ this.encryptionManager = encryptionManager;
+ this.nameMapping = PropertyUtil.propertyAsString(table.properties(),
DEFAULT_NAME_MAPPING, null);
+
+ String formatString = PropertyUtil.propertyAsString(table.properties(),
TableProperties.DEFAULT_FILE_FORMAT,
+ TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+ RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
+ this.taskWriterFactory = new RowDataTaskWriterFactory(
+ table.schema(),
+ flinkSchema,
+ table.spec(),
+ table.locationProvider(),
+ io,
+ encryptionManager,
+ Long.MAX_VALUE,
+ format,
+ table.properties());
+ }
+
+ public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask>
dataStream, int parallelism) {
+ RewriteMap map = new RewriteMap(schema, nameMapping, io, caseSensitive,
encryptionManager, taskWriterFactory);
+ DataStream<List<DataFile>> ds =
dataStream.map(map).setParallelism(parallelism);
+ return
Lists.newArrayList(DataStreamUtils.collect(ds)).stream().flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
+
+ public static class RewriteMap extends RichMapFunction<CombinedScanTask,
List<DataFile>> {
+
+ private TaskWriter<RowData> writer;
+ private int subTaskId;
+ private int attemptId;
+
+ private final Schema schema;
+ private final String nameMapping;
+ private final FileIO io;
+ private final boolean caseSensitive;
+ private final EncryptionManager encryptionManager;
+ private final TaskWriterFactory<RowData> taskWriterFactory;
+
+ public RewriteMap(Schema schema, String nameMapping, FileIO io, boolean
caseSensitive,
+ EncryptionManager encryptionManager,
TaskWriterFactory<RowData> taskWriterFactory) {
+ this.schema = schema;
+ this.nameMapping = nameMapping;
+ this.io = io;
+ this.caseSensitive = caseSensitive;
+ this.encryptionManager = encryptionManager;
+ this.taskWriterFactory = taskWriterFactory;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+ this.attemptId = getRuntimeContext().getAttemptNumber();
+ // Initialize the task writer factory.
+ this.taskWriterFactory.initialize(subTaskId, attemptId);
+ }
+
+ @Override
+ public List<DataFile> map(CombinedScanTask task) throws Exception {
+ // Initialize the task writer.
+ this.writer = taskWriterFactory.create();
+ try (RowDataIterator iterator =
+ new RowDataIterator(task, io, encryptionManager, schema,
schema, nameMapping, caseSensitive)) {
+ while (iterator.hasNext()) {
+ RowData rowData = iterator.next();
+ writer.write(rowData);
+ }
+ return Lists.newArrayList(writer.complete());
+ } catch (Throwable originalThrowable) {
+ try {
+ LOG.error("Aborting commit for (subTaskId {}, attemptId {})",
subTaskId, attemptId);
+ writer.abort();
+ LOG.error("Aborted commit for (subTaskId {}, attemptId {})",
subTaskId, attemptId);
+ } catch (Throwable inner) {
+ if (originalThrowable != inner) {
+ originalThrowable.addSuppressed(inner);
+ LOG.warn("Suppressing exception in catch: {}", inner.getMessage(),
inner);
+ }
+ }
+
+ if (originalThrowable instanceof Exception) {
+ throw originalThrowable;
+ } else {
+ throw new RuntimeException(originalThrowable);
+ }
+ }
+ }
+ }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index b680af7..6782267 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -72,7 +72,7 @@ public abstract class FlinkTestBase extends AbstractTestBase {
return tEnv;
}
- List<Object[]> sql(String query, Object... args) {
+ protected List<Object[]> sql(String query, Object... args) {
TableResult tableResult = getTableEnv().executeSql(String.format(query,
args));
tableResult.getJobClient().ifPresent(c -> {
try {
diff --git
a/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
b/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
new file mode 100644
index 0000000..748a4ce
--- /dev/null
+++
b/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iceberg.flink.actions;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesActionResult;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.flink.SimpleDataUtil.RECORD;
+
+@RunWith(Parameterized.class)
+public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
+
+ private static final String TABLE_NAME_UNPARTITIONED =
"test_table_unpartitioned";
+ private static final String TABLE_NAME_PARTITIONED =
"test_table_partitioned";
+ private final FileFormat format;
+ private Table icebergTableUnPartitioned;
+ private Table icebergTablePartitioned;
+
+ public TestRewriteDataFilesAction(String catalogName, String[]
baseNamespace, FileFormat format) {
+ super(catalogName, baseNamespace);
+ this.format = format;
+ }
+
+ @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1},
format={2}")
+ public static Iterable<Object[]> parameters() {
+ List<Object[]> parameters = Lists.newArrayList();
+ for (FileFormat format : new FileFormat[] {FileFormat.AVRO,
FileFormat.ORC, FileFormat.PARQUET}) {
+ for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) {
+ String catalogName = (String) catalogParams[0];
+ String[] baseNamespace = (String[]) catalogParams[1];
+ parameters.add(new Object[] {catalogName, baseNamespace, format});
+ }
+ }
+ return parameters;
+ }
+
+ @Before
+ public void before() {
+ super.before();
+ sql("CREATE DATABASE %s", flinkDatabase);
+ sql("USE CATALOG %s", catalogName);
+ sql("USE %s", DATABASE);
+ sql("CREATE TABLE %s (id int, data varchar) with
('write.format.default'='%s')", TABLE_NAME_UNPARTITIONED,
+ format.name());
+ icebergTableUnPartitioned =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace,
+ TABLE_NAME_UNPARTITIONED));
+
+ sql("CREATE TABLE %s (id int, data varchar,spec varchar) " +
+ " PARTITIONED BY (data,spec) with ('write.format.default'='%s')",
+ TABLE_NAME_PARTITIONED, format.name());
+ icebergTablePartitioned =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace,
+ TABLE_NAME_PARTITIONED));
+ }
+
+ @After
+ public void clean() {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_UNPARTITIONED);
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_PARTITIONED);
+ sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+ super.clean();
+ }
+
+ @Test
+ public void testRewriteDataFilesEmptyTable() throws Exception {
+ Assert.assertNull("Table must be empty",
icebergTableUnPartitioned.currentSnapshot());
+ Actions.forTable(icebergTableUnPartitioned)
+ .rewriteDataFiles()
+ .execute();
+ Assert.assertNull("Table must stay empty",
icebergTableUnPartitioned.currentSnapshot());
+ }
+
+
+ @Test
+ public void testRewriteDataFilesUnpartitionedTable() throws Exception {
+ sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
+ sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
+
+ icebergTableUnPartitioned.refresh();
+
+ CloseableIterable<FileScanTask> tasks =
icebergTableUnPartitioned.newScan().planFiles();
+ List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+ Assert.assertEquals("Should have 2 data files before rewrite", 2,
dataFiles.size());
+
+ RewriteDataFilesActionResult result =
+ Actions.forTable(icebergTableUnPartitioned)
+ .rewriteDataFiles()
+ .execute();
+
+ Assert.assertEquals("Action should rewrite 2 data files", 2,
result.deletedDataFiles().size());
+ Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFiles().size());
+
+ icebergTableUnPartitioned.refresh();
+
+ CloseableIterable<FileScanTask> tasks1 =
icebergTableUnPartitioned.newScan().planFiles();
+ List<DataFile> dataFiles1 =
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+ Assert.assertEquals("Should have 1 data files after rewrite", 1,
dataFiles1.size());
+
+ // Assert the table records as expected.
+ SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned,
Lists.newArrayList(
+ SimpleDataUtil.createRecord(1, "hello"),
+ SimpleDataUtil.createRecord(2, "world")
+ ));
+ }
+
+ @Test
+ public void testRewriteDataFilesPartitionedTable() throws Exception {
+ sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
+ sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
+ sql("INSERT INTO %s SELECT 3, 'world' ,'b'", TABLE_NAME_PARTITIONED);
+ sql("INSERT INTO %s SELECT 4, 'world' ,'b'", TABLE_NAME_PARTITIONED);
+
+ icebergTablePartitioned.refresh();
+
+ CloseableIterable<FileScanTask> tasks =
icebergTablePartitioned.newScan().planFiles();
+ List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+ Assert.assertEquals("Should have 4 data files before rewrite", 4,
dataFiles.size());
+
+ RewriteDataFilesActionResult result =
+ Actions.forTable(icebergTablePartitioned)
+ .rewriteDataFiles()
+ .execute();
+
+ Assert.assertEquals("Action should rewrite 4 data files", 4,
result.deletedDataFiles().size());
+ Assert.assertEquals("Action should add 2 data file", 2,
result.addedDataFiles().size());
+
+ icebergTablePartitioned.refresh();
+
+ CloseableIterable<FileScanTask> tasks1 =
icebergTablePartitioned.newScan().planFiles();
+ List<DataFile> dataFiles1 =
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+ Assert.assertEquals("Should have 2 data files after rewrite", 2,
dataFiles1.size());
+
+ // Assert the table records as expected.
+ Schema schema = new Schema(
+ Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(3, "spec", Types.StringType.get())
+ );
+
+ Record record = GenericRecord.create(schema);
+ SimpleDataUtil.assertTableRecords(icebergTablePartitioned,
Lists.newArrayList(
+ record.copy("id", 1, "data", "hello", "spec", "a"),
+ record.copy("id", 2, "data", "hello", "spec", "a"),
+ record.copy("id", 3, "data", "world", "spec", "b"),
+ record.copy("id", 4, "data", "world", "spec", "b")
+ ));
+ }
+
+
+ @Test
+ public void testRewriteDataFilesWithFilter() throws Exception {
+ sql("INSERT INTO %s SELECT 1, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
+ sql("INSERT INTO %s SELECT 2, 'hello' ,'a'", TABLE_NAME_PARTITIONED);
+ sql("INSERT INTO %s SELECT 3, 'world' ,'a'", TABLE_NAME_PARTITIONED);
+ sql("INSERT INTO %s SELECT 4, 'world' ,'b'", TABLE_NAME_PARTITIONED);
+ sql("INSERT INTO %s SELECT 5, 'world' ,'b'", TABLE_NAME_PARTITIONED);
+
+ icebergTablePartitioned.refresh();
+
+ CloseableIterable<FileScanTask> tasks =
icebergTablePartitioned.newScan().planFiles();
+ List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+ Assert.assertEquals("Should have 5 data files before rewrite", 5,
dataFiles.size());
+
+ RewriteDataFilesActionResult result =
+ Actions.forTable(icebergTablePartitioned)
+ .rewriteDataFiles()
+ .filter(Expressions.equal("spec", "a"))
+ .filter(Expressions.startsWith("data", "he"))
+ .execute();
+
+ Assert.assertEquals("Action should rewrite 2 data files", 2,
result.deletedDataFiles().size());
+ Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFiles().size());
+
+ icebergTablePartitioned.refresh();
+
+ CloseableIterable<FileScanTask> tasks1 =
icebergTablePartitioned.newScan().planFiles();
+ List<DataFile> dataFiles1 =
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+ Assert.assertEquals("Should have 4 data files after rewrite", 4,
dataFiles1.size());
+
+ // Assert the table records as expected.
+ Schema schema = new Schema(
+ Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(3, "spec", Types.StringType.get())
+ );
+
+ Record record = GenericRecord.create(schema);
+ SimpleDataUtil.assertTableRecords(icebergTablePartitioned,
Lists.newArrayList(
+ record.copy("id", 1, "data", "hello", "spec", "a"),
+ record.copy("id", 2, "data", "hello", "spec", "a"),
+ record.copy("id", 3, "data", "world", "spec", "a"),
+ record.copy("id", 4, "data", "world", "spec", "b"),
+ record.copy("id", 5, "data", "world", "spec", "b")
+ ));
+ }
+
+ @Test
+ public void testRewriteLargeTableHasResiduals() throws IOException {
+ // all records belong to the same partition
+ List<String> records1 = Lists.newArrayList();
+ List<String> records2 = Lists.newArrayList();
+ List<Record> expected = Lists.newArrayList();
+ for (int i = 0; i < 100; i++) {
+ int id = i;
+ String data = String.valueOf(i % 3);
+ if (i % 2 == 0) {
+ records1.add("(" + id + ",'" + data + "')");
+ } else {
+ records2.add("(" + id + ",'" + data + "')");
+ }
+ Record record = RECORD.copy();
+ record.setField("id", id);
+ record.setField("data", data);
+ expected.add(record);
+ }
+
+ sql("INSERT INTO %s values " + StringUtils.join(records1, ","),
TABLE_NAME_UNPARTITIONED);
+ sql("INSERT INTO %s values " + StringUtils.join(records2, ","),
TABLE_NAME_UNPARTITIONED);
+
+ icebergTableUnPartitioned.refresh();
+
+ CloseableIterable<FileScanTask> tasks = icebergTableUnPartitioned.newScan()
+ .ignoreResiduals()
+ .filter(Expressions.equal("data", "0"))
+ .planFiles();
+ for (FileScanTask task : tasks) {
+ Assert.assertEquals("Residuals must be ignored",
Expressions.alwaysTrue(), task.residual());
+ }
+ List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+ Assert.assertEquals("Should have 2 data files before rewrite", 2,
dataFiles.size());
+
+ Actions actions = Actions.forTable(icebergTableUnPartitioned);
+
+ RewriteDataFilesActionResult result = actions
+ .rewriteDataFiles()
+ .filter(Expressions.equal("data", "0"))
+ .execute();
+ Assert.assertEquals("Action should rewrite 2 data files", 2,
result.deletedDataFiles().size());
+ Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFiles().size());
+
+ // Assert the table records as expected.
+ SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
+ }
+}
diff --git
a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
index b22da52..1f40738 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
@@ -36,6 +36,7 @@ public class RewriteDataFilesAction
extends BaseRewriteDataFilesAction<RewriteDataFilesAction> {
private final JavaSparkContext sparkContext;
+ private FileIO fileIO;
RewriteDataFilesAction(SparkSession spark, Table table) {
super(table);
@@ -49,7 +50,10 @@ public class RewriteDataFilesAction
@Override
protected FileIO fileIO() {
- return SparkUtil.serializableFileIO(table());
+ if (this.fileIO == null) {
+ this.fileIO = SparkUtil.serializableFileIO(table());
+ }
+ return this.fileIO;
}
@Override