This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new f356087156 Flink: Test both "new" Flink Avro planned reader and 
"deprecated" Avro reader (#11430)
f356087156 is described below

commit f3560871564c95a0e7b2bff3ca6ecb2e08726d01
Author: JB Onofré <[email protected]>
AuthorDate: Tue Nov 26 16:55:09 2024 +0100

    Flink: Test both "new" Flink Avro planned reader and "deprecated" Avro 
reader (#11430)
---
 ...java => AbstractTestFlinkAvroReaderWriter.java} | 11 ++--
 .../data/TestFlinkAvroDeprecatedReaderWriter.java  | 38 ++++++++++++++
 .../data/TestFlinkAvroPlannedReaderWriter.java     | 34 +++++++++++++
 .../iceberg/flink/data/TestRowProjection.java      | 58 +++++++++++++++-------
 4 files changed, 116 insertions(+), 25 deletions(-)

diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java
similarity index 96%
rename from 
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
rename to 
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java
index 2b9e8694b6..cbf49ae6fa 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroReaderWriter.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java
@@ -48,7 +48,7 @@ import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.DateTimeUtil;
 import org.junit.jupiter.api.Test;
 
-public class TestFlinkAvroReaderWriter extends DataTest {
+public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest {
 
   private static final int NUM_RECORDS = 100;
 
@@ -70,6 +70,8 @@ public class TestFlinkAvroReaderWriter extends DataTest {
     writeAndValidate(schema, expectedRecords, NUM_RECORDS);
   }
 
+  protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, 
Schema schema);
+
   private void writeAndValidate(Schema schema, List<Record> expectedRecords, 
int numRecord)
       throws IOException {
     RowType flinkSchema = FlinkSchemaUtil.convert(schema);
@@ -88,11 +90,7 @@ public class TestFlinkAvroReaderWriter extends DataTest {
       writer.addAll(expectedRecords);
     }
 
-    try (CloseableIterable<RowData> reader =
-        Avro.read(Files.localInput(recordsFile))
-            .project(schema)
-            .createResolvingReader(FlinkPlannedAvroReader::create)
-            .build()) {
+    try (CloseableIterable<RowData> reader = 
createAvroReadBuilder(recordsFile, schema).build()) {
       Iterator<Record> expected = expectedRecords.iterator();
       Iterator<RowData> rows = reader.iterator();
       for (int i = 0; i < numRecord; i++) {
@@ -156,7 +154,6 @@ public class TestFlinkAvroReaderWriter extends DataTest {
 
   @Test
   public void testNumericTypes() throws IOException {
-
     List<Record> expected =
         ImmutableList.of(
             recordNumType(
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroDeprecatedReaderWriter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroDeprecatedReaderWriter.java
new file mode 100644
index 0000000000..03910f4fda
--- /dev/null
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroDeprecatedReaderWriter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.io.File;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+
+/**
+ * @deprecated should be removed in 1.8.0; along with FlinkAvroReader.
+ */
+@Deprecated
+public class TestFlinkAvroDeprecatedReaderWriter extends 
AbstractTestFlinkAvroReaderWriter {
+
+  @Override
+  protected Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema 
schema) {
+    return Avro.read(Files.localInput(recordsFile))
+        .project(schema)
+        .createReaderFunc(FlinkAvroReader::new);
+  }
+}
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java
new file mode 100644
index 0000000000..102a26a947
--- /dev/null
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkAvroPlannedReaderWriter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.io.File;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+
+public class TestFlinkAvroPlannedReaderWriter extends 
AbstractTestFlinkAvroReaderWriter {
+
+  @Override
+  protected Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema 
schema) {
+    return Avro.read(Files.localInput(recordsFile))
+        .project(schema)
+        .createResolvingReader(FlinkPlannedAvroReader::create);
+  }
+}
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
index 3b6cf0c58f..f76e4c4942 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowProjection.java
@@ -24,6 +24,8 @@ import static org.assertj.core.api.Assertions.withPrecision;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.GenericArrayData;
@@ -32,6 +34,9 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -41,13 +46,23 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Types;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 
+@ExtendWith(ParameterizedTestExtension.class)
 public class TestRowProjection {
 
   @TempDir private Path temp;
 
+  @Parameter(index = 0)
+  protected Boolean useAvroPlannedReader;
+
+  @Parameters(name = "useAvroPlannedReader={0}")
+  protected static List<Object[]> parameters() {
+    return Arrays.asList(new Object[] {Boolean.FALSE}, new Object[] 
{Boolean.TRUE});
+  }
+
   private RowData writeAndRead(String desc, Schema writeSchema, Schema 
readSchema, RowData row)
       throws IOException {
     File file = File.createTempFile("junit", desc + ".avro", temp.toFile());
@@ -61,16 +76,23 @@ public class TestRowProjection {
       appender.add(row);
     }
 
-    Iterable<RowData> records =
+    Avro.ReadBuilder builder =
         Avro.read(Files.localInput(file))
             .project(readSchema)
-            .createResolvingReader(FlinkPlannedAvroReader::create)
-            .build();
+            .createReaderFunc(FlinkAvroReader::new);
+    if (useAvroPlannedReader) {
+      builder =
+          Avro.read(Files.localInput(file))
+              .project(readSchema)
+              .createResolvingReader(FlinkPlannedAvroReader::create);
+    }
+
+    Iterable<RowData> records = builder.build();
 
     return Iterables.getOnlyElement(records);
   }
 
-  @Test
+  @TestTemplate
   public void testFullProjection() throws Exception {
     Schema schema =
         new Schema(
@@ -85,7 +107,7 @@ public class TestRowProjection {
     assertThat(projected.getString(1)).asString().isEqualTo("test");
   }
 
-  @Test
+  @TestTemplate
   public void testSpecialCharacterProjection() throws Exception {
     Schema schema =
         new Schema(
@@ -105,7 +127,7 @@ public class TestRowProjection {
     assertThat(projected.getString(0)).asString().isEqualTo("test");
   }
 
-  @Test
+  @TestTemplate
   public void testReorderedFullProjection() throws Exception {
     Schema schema =
         new Schema(
@@ -125,7 +147,7 @@ public class TestRowProjection {
     assertThat(projected.getLong(1)).isEqualTo(34);
   }
 
-  @Test
+  @TestTemplate
   public void testReorderedProjection() throws Exception {
     Schema schema =
         new Schema(
@@ -147,7 +169,7 @@ public class TestRowProjection {
     assertThat(projected.isNullAt(2)).isTrue();
   }
 
-  @Test
+  @TestTemplate
   public void testRenamedAddedField() throws Exception {
     Schema schema =
         new Schema(
@@ -177,7 +199,7 @@ public class TestRowProjection {
     assertThat(projected.isNullAt(3)).as("Should contain empty value on new 
column 4").isTrue();
   }
 
-  @Test
+  @TestTemplate
   public void testEmptyProjection() throws Exception {
     Schema schema =
         new Schema(
@@ -192,7 +214,7 @@ public class TestRowProjection {
     assertThat(projected.getArity()).isEqualTo(0);
   }
 
-  @Test
+  @TestTemplate
   public void testBasicProjection() throws Exception {
     Schema writeSchema =
         new Schema(
@@ -216,7 +238,7 @@ public class TestRowProjection {
     assertThat(projected.getString(0)).asString().isEqualTo("test");
   }
 
-  @Test
+  @TestTemplate
   public void testRename() throws Exception {
     Schema writeSchema =
         new Schema(
@@ -239,7 +261,7 @@ public class TestRowProjection {
         .isEqualTo("test");
   }
 
-  @Test
+  @TestTemplate
   public void testNestedStructProjection() throws Exception {
     Schema writeSchema =
         new Schema(
@@ -305,7 +327,7 @@ public class TestRowProjection {
         .isEqualTo(-1.539054f, withPrecision(0.000001f));
   }
 
-  @Test
+  @TestTemplate
   public void testMapProjection() throws IOException {
     Schema writeSchema =
         new Schema(
@@ -359,7 +381,7 @@ public class TestRowProjection {
     return stringMap;
   }
 
-  @Test
+  @TestTemplate
   public void testMapOfStructsProjection() throws IOException {
     Schema writeSchema =
         new Schema(
@@ -459,7 +481,7 @@ public class TestRowProjection {
         .isEqualTo(52.995143f, withPrecision(0.000001f));
   }
 
-  @Test
+  @TestTemplate
   public void testListProjection() throws IOException {
     Schema writeSchema =
         new Schema(
@@ -488,7 +510,7 @@ public class TestRowProjection {
     assertThat(projected.getArray(0)).isEqualTo(values);
   }
 
-  @Test
+  @TestTemplate
   @SuppressWarnings("unchecked")
   public void testListOfStructsProjection() throws IOException {
     Schema writeSchema =
@@ -565,7 +587,7 @@ public class TestRowProjection {
     assertThat(projectedP2.isNullAt(0)).as("Should project null z").isTrue();
   }
 
-  @Test
+  @TestTemplate
   public void testAddedFieldsWithRequiredChildren() throws Exception {
     Schema schema = new Schema(Types.NestedField.required(1, "a", 
Types.LongType.get()));
 

Reply via email to