This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0a24350 Flink 1.14: Add tests to check whether should remove meta
columns in source reader (#3893)
0a24350 is described below
commit 0a24350d7c4fd86935f61e0d77ab27c49fa81e69
Author: liliwei <[email protected]>
AuthorDate: Thu Jan 20 10:34:43 2022 +0800
Flink 1.14: Add tests to check whether should remove meta columns in source
reader (#3893)
---
.../java/org/apache/iceberg/flink/TestHelpers.java | 29 ++--
.../flink/source/TestProjectMetaColumn.java | 180 +++++++++++++++++++++
2 files changed, 199 insertions(+), 10 deletions(-)
diff --git
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index 8e2af6e..e2bf306 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -31,6 +31,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -87,19 +88,23 @@ public class TestHelpers {
return RowDataUtil.clone(from, null, rowType, fieldSerializers);
}
- public static List<RowData> readRowData(FlinkInputFormat inputFormat,
RowType rowType) throws IOException {
- FlinkInputSplit[] splits = inputFormat.createInputSplits(0);
- List<RowData> results = Lists.newArrayList();
-
- for (FlinkInputSplit s : splits) {
- inputFormat.open(s);
- while (!inputFormat.reachedEnd()) {
- RowData row = inputFormat.nextRecord(null);
- results.add(copyRowData(row, rowType));
+ public static void readRowData(FlinkInputFormat input, Consumer<RowData>
visitor) throws IOException {
+ for (FlinkInputSplit s : input.createInputSplits(0)) {
+ input.open(s);
+ try {
+ while (!input.reachedEnd()) {
+ RowData row = input.nextRecord(null);
+ visitor.accept(row);
+ }
+ } finally {
+ input.close();
}
}
- inputFormat.close();
+ }
+ public static List<RowData> readRowData(FlinkInputFormat inputFormat,
RowType rowType) throws IOException {
+ List<RowData> results = Lists.newArrayList();
+ readRowData(inputFormat, row -> results.add(copyRowData(row, rowType)));
return results;
}
@@ -125,6 +130,10 @@ public class TestHelpers {
assertRows(results, expected);
}
+ public static void assertRows(List<RowData> results, List<RowData> expected,
RowType rowType) {
+ assertRows(convertRowDataToRow(results, rowType),
convertRowDataToRow(expected, rowType));
+ }
+
public static void assertRows(List<Row> results, List<Row> expected) {
expected.sort(Comparator.comparing(Row::toString));
results.sort(Comparator.comparing(Row::toString));
diff --git
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
new file mode 100644
index 0000000..baccadc
--- /dev/null
+++
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.data.RowDataProjection;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestProjectMetaColumn {
+
+ @Rule
+ public final TemporaryFolder folder = new TemporaryFolder();
+ private final FileFormat format;
+
+ @Parameterized.Parameters(name = "fileFormat={0}")
+ public static Iterable<Object[]> parameters() {
+ return Lists.newArrayList(
+ new Object[] {FileFormat.PARQUET},
+ new Object[] {FileFormat.ORC},
+ new Object[] {FileFormat.AVRO}
+ );
+ }
+
+ public TestProjectMetaColumn(FileFormat format) {
+ this.format = format;
+ }
+
+ private void testSkipToRemoveMetaColumn(int formatVersion) throws
IOException {
+ // Create the table with given format version.
+ String location = folder.getRoot().getAbsolutePath();
+ Table table = SimpleDataUtil.createTable(location,
+ ImmutableMap.of(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion)),
+ false);
+
+ List<RowData> rows = Lists.newArrayList(
+ SimpleDataUtil.createInsert(1, "AAA"),
+ SimpleDataUtil.createInsert(2, "BBB"),
+ SimpleDataUtil.createInsert(3, "CCC")
+ );
+ writeAndCommit(table, ImmutableList.of(), false, rows);
+
+ FlinkInputFormat input = FlinkSource
+ .forRowData()
+ .tableLoader(TableLoader.fromHadoopTable(location))
+ .buildFormat();
+
+ List<RowData> results = Lists.newArrayList();
+ TestHelpers.readRowData(input, rowData -> {
+ // If project to remove the meta columns, it will get a
RowDataProjection.
+ Assert.assertTrue(rowData instanceof GenericRowData);
+ results.add(TestHelpers.copyRowData(rowData, SimpleDataUtil.ROW_TYPE));
+ });
+
+ // Assert the results.
+ TestHelpers.assertRows(rows, results, SimpleDataUtil.ROW_TYPE);
+ }
+
+ @Test
+ public void testV1SkipToRemoveMetaColumn() throws IOException {
+ testSkipToRemoveMetaColumn(1);
+ }
+
+ @Test
+ public void testV2SkipToRemoveMetaColumn() throws IOException {
+ testSkipToRemoveMetaColumn(2);
+ }
+
+ @Test
+ public void testV2RemoveMetaColumn() throws Exception {
+ // Create the v2 table.
+ String location = folder.getRoot().getAbsolutePath();
+ Table table = SimpleDataUtil.createTable(location,
ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"), false);
+
+ List<RowData> rows = Lists.newArrayList(
+ SimpleDataUtil.createInsert(1, "AAA"),
+ SimpleDataUtil.createDelete(1, "AAA"),
+ SimpleDataUtil.createInsert(2, "AAA"),
+ SimpleDataUtil.createInsert(2, "BBB")
+ );
+ int eqFieldId = table.schema().findField("data").fieldId();
+ writeAndCommit(table, ImmutableList.of(eqFieldId), true, rows);
+
+ FlinkInputFormat input = FlinkSource
+ .forRowData()
+ .tableLoader(TableLoader.fromHadoopTable(location))
+ .buildFormat();
+
+ List<RowData> results = Lists.newArrayList();
+ TestHelpers.readRowData(input, rowData -> {
+ // If project to remove the meta columns, it will get a
RowDataProjection.
+ Assert.assertTrue(rowData instanceof RowDataProjection);
+ results.add(TestHelpers.copyRowData(rowData, SimpleDataUtil.ROW_TYPE));
+ });
+
+ // Assert the results.
+ TestHelpers.assertRows(ImmutableList.of(
+ SimpleDataUtil.createInsert(2, "AAA"),
+ SimpleDataUtil.createInsert(2, "BBB")
+ ), results, SimpleDataUtil.ROW_TYPE);
+ }
+
+ private void writeAndCommit(Table table, List<Integer> eqFieldIds, boolean
upsert, List<RowData> rows)
+ throws IOException {
+ TaskWriter<RowData> writer = createTaskWriter(table, eqFieldIds, upsert);
+ try (TaskWriter<RowData> io = writer) {
+ for (RowData row : rows) {
+ io.write(row);
+ }
+ }
+
+ RowDelta delta = table.newRowDelta();
+ WriteResult result = writer.complete();
+
+ for (DataFile dataFile : result.dataFiles()) {
+ delta.addRows(dataFile);
+ }
+
+ for (DeleteFile deleteFile : result.deleteFiles()) {
+ delta.addDeletes(deleteFile);
+ }
+
+ delta.commit();
+ }
+
+ private TaskWriter<RowData> createTaskWriter(Table table, List<Integer>
equalityFieldIds, boolean upsert) {
+ TaskWriterFactory<RowData> taskWriterFactory = new
RowDataTaskWriterFactory(
+ SerializableTable.copyOf(table),
+ SimpleDataUtil.ROW_TYPE,
+ TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
+ format,
+ equalityFieldIds,
+ upsert);
+
+ taskWriterFactory.initialize(1, 1);
+ return taskWriterFactory.create();
+ }
+}