This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a24350  Flink 1.14: Add tests to check whether should remove meta 
columns in source reader (#3893)
0a24350 is described below

commit 0a24350d7c4fd86935f61e0d77ab27c49fa81e69
Author: liliwei <[email protected]>
AuthorDate: Thu Jan 20 10:34:43 2022 +0800

    Flink 1.14: Add tests to check whether should remove meta columns in source 
reader (#3893)
---
 .../java/org/apache/iceberg/flink/TestHelpers.java |  29 ++--
 .../flink/source/TestProjectMetaColumn.java        | 180 +++++++++++++++++++++
 2 files changed, 199 insertions(+), 10 deletions(-)

diff --git 
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
index 8e2af6e..e2bf306 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
@@ -31,6 +31,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -87,19 +88,23 @@ public class TestHelpers {
     return RowDataUtil.clone(from, null, rowType, fieldSerializers);
   }
 
-  public static List<RowData> readRowData(FlinkInputFormat inputFormat, 
RowType rowType) throws IOException {
-    FlinkInputSplit[] splits = inputFormat.createInputSplits(0);
-    List<RowData> results = Lists.newArrayList();
-
-    for (FlinkInputSplit s : splits) {
-      inputFormat.open(s);
-      while (!inputFormat.reachedEnd()) {
-        RowData row = inputFormat.nextRecord(null);
-        results.add(copyRowData(row, rowType));
+  public static void readRowData(FlinkInputFormat input, Consumer<RowData> 
visitor) throws IOException {
+    for (FlinkInputSplit s : input.createInputSplits(0)) {
+      input.open(s);
+      try {
+        while (!input.reachedEnd()) {
+          RowData row = input.nextRecord(null);
+          visitor.accept(row);
+        }
+      } finally {
+        input.close();
       }
     }
-    inputFormat.close();
+  }
 
+  public static List<RowData> readRowData(FlinkInputFormat inputFormat, 
RowType rowType) throws IOException {
+    List<RowData> results = Lists.newArrayList();
+    readRowData(inputFormat, row -> results.add(copyRowData(row, rowType)));
     return results;
   }
 
@@ -125,6 +130,10 @@ public class TestHelpers {
     assertRows(results, expected);
   }
 
+  public static void assertRows(List<RowData> results, List<RowData> expected, 
RowType rowType) {
+    assertRows(convertRowDataToRow(results, rowType), 
convertRowDataToRow(expected, rowType));
+  }
+
   public static void assertRows(List<Row> results, List<Row> expected) {
     expected.sort(Comparator.comparing(Row::toString));
     results.sort(Comparator.comparing(Row::toString));
diff --git 
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
new file mode 100644
index 0000000..baccadc
--- /dev/null
+++ 
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.flink.data.RowDataProjection;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestProjectMetaColumn {
+
+  @Rule
+  public final TemporaryFolder folder = new TemporaryFolder();
+  private final FileFormat format;
+
+  @Parameterized.Parameters(name = "fileFormat={0}")
+  public static Iterable<Object[]> parameters() {
+    return Lists.newArrayList(
+        new Object[] {FileFormat.PARQUET},
+        new Object[] {FileFormat.ORC},
+        new Object[] {FileFormat.AVRO}
+    );
+  }
+
+  public TestProjectMetaColumn(FileFormat format) {
+    this.format = format;
+  }
+
+  private void testSkipToRemoveMetaColumn(int formatVersion) throws 
IOException {
+    // Create the table with given format version.
+    String location = folder.getRoot().getAbsolutePath();
+    Table table = SimpleDataUtil.createTable(location,
+        ImmutableMap.of(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion)),
+        false);
+
+    List<RowData> rows = Lists.newArrayList(
+        SimpleDataUtil.createInsert(1, "AAA"),
+        SimpleDataUtil.createInsert(2, "BBB"),
+        SimpleDataUtil.createInsert(3, "CCC")
+    );
+    writeAndCommit(table, ImmutableList.of(), false, rows);
+
+    FlinkInputFormat input = FlinkSource
+        .forRowData()
+        .tableLoader(TableLoader.fromHadoopTable(location))
+        .buildFormat();
+
+    List<RowData> results = Lists.newArrayList();
+    TestHelpers.readRowData(input, rowData -> {
+      // If project to remove the meta columns, it will get a 
RowDataProjection.
+      Assert.assertTrue(rowData instanceof GenericRowData);
+      results.add(TestHelpers.copyRowData(rowData, SimpleDataUtil.ROW_TYPE));
+    });
+
+    // Assert the results.
+    TestHelpers.assertRows(rows, results, SimpleDataUtil.ROW_TYPE);
+  }
+
+  @Test
+  public void testV1SkipToRemoveMetaColumn() throws IOException {
+    testSkipToRemoveMetaColumn(1);
+  }
+
+  @Test
+  public void testV2SkipToRemoveMetaColumn() throws IOException {
+    testSkipToRemoveMetaColumn(2);
+  }
+
+  @Test
+  public void testV2RemoveMetaColumn() throws Exception {
+    // Create the v2 table.
+    String location = folder.getRoot().getAbsolutePath();
+    Table table = SimpleDataUtil.createTable(location, 
ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"), false);
+
+    List<RowData> rows = Lists.newArrayList(
+        SimpleDataUtil.createInsert(1, "AAA"),
+        SimpleDataUtil.createDelete(1, "AAA"),
+        SimpleDataUtil.createInsert(2, "AAA"),
+        SimpleDataUtil.createInsert(2, "BBB")
+    );
+    int eqFieldId = table.schema().findField("data").fieldId();
+    writeAndCommit(table, ImmutableList.of(eqFieldId), true, rows);
+
+    FlinkInputFormat input = FlinkSource
+        .forRowData()
+        .tableLoader(TableLoader.fromHadoopTable(location))
+        .buildFormat();
+
+    List<RowData> results = Lists.newArrayList();
+    TestHelpers.readRowData(input, rowData -> {
+      // If project to remove the meta columns, it will get a 
RowDataProjection.
+      Assert.assertTrue(rowData instanceof RowDataProjection);
+      results.add(TestHelpers.copyRowData(rowData, SimpleDataUtil.ROW_TYPE));
+    });
+
+    // Assert the results.
+    TestHelpers.assertRows(ImmutableList.of(
+        SimpleDataUtil.createInsert(2, "AAA"),
+        SimpleDataUtil.createInsert(2, "BBB")
+    ), results, SimpleDataUtil.ROW_TYPE);
+  }
+
+  private void writeAndCommit(Table table, List<Integer> eqFieldIds, boolean 
upsert, List<RowData> rows)
+      throws IOException {
+    TaskWriter<RowData> writer = createTaskWriter(table, eqFieldIds, upsert);
+    try (TaskWriter<RowData> io = writer) {
+      for (RowData row : rows) {
+        io.write(row);
+      }
+    }
+
+    RowDelta delta = table.newRowDelta();
+    WriteResult result = writer.complete();
+
+    for (DataFile dataFile : result.dataFiles()) {
+      delta.addRows(dataFile);
+    }
+
+    for (DeleteFile deleteFile : result.deleteFiles()) {
+      delta.addDeletes(deleteFile);
+    }
+
+    delta.commit();
+  }
+
+  private TaskWriter<RowData> createTaskWriter(Table table, List<Integer> 
equalityFieldIds, boolean upsert) {
+    TaskWriterFactory<RowData> taskWriterFactory = new 
RowDataTaskWriterFactory(
+        SerializableTable.copyOf(table),
+        SimpleDataUtil.ROW_TYPE,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
+        format,
+        equalityFieldIds,
+        upsert);
+
+    taskWriterFactory.initialize(1, 1);
+    return taskWriterFactory.create();
+  }
+}

Reply via email to