This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 8685985a03 Core, Spark 4.0, 3.5, 3.4: Remove usage of deprecated
avro/DataReader class (#14387)
8685985a03 is described below
commit 8685985a039de97ff1ac834be2095e8c08be2482
Author: gaborkaszab <[email protected]>
AuthorDate: Fri Oct 31 08:39:56 2025 +0100
Core, Spark 4.0, 3.5, 3.4: Remove usage of deprecated avro/DataReader class
(#14387)
---
.../org/apache/iceberg/data/avro/DataReader.java | 2 +-
.../spark/actions/RewriteTablePathSparkAction.java | 4 +-
.../spark/actions/TestRewriteTablePathsAction.java | 47 ++++++++++++++++++----
.../spark/actions/RewriteTablePathSparkAction.java | 4 +-
.../spark/actions/TestRewriteTablePathsAction.java | 47 ++++++++++++++++++----
.../spark/actions/RewriteTablePathSparkAction.java | 4 +-
.../spark/actions/TestRewriteTablePathsAction.java | 47 ++++++++++++++++++----
7 files changed, 124 insertions(+), 31 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
index 75929dfde4..5647253a5b 100644
--- a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
+++ b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java
@@ -37,7 +37,7 @@ import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
/**
- * @deprecated will be removed in 2.0.0; use {@link PlannedDataReader} instead.
+ * @deprecated will be removed in 1.12.0; use {@link PlannedDataReader}
instead.
*/
@Deprecated
public class DataReader<T> implements DatumReader<T>, SupportsRowPosition {
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
index fc0dc298ae..d6a13bcd51 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
@@ -51,8 +51,8 @@ import org.apache.iceberg.actions.ImmutableRewriteTablePath;
import org.apache.iceberg.actions.RewriteTablePath;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.avro.PlannedDataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
@@ -725,7 +725,7 @@ public class RewriteTablePathSparkAction extends
BaseSparkAction<RewriteTablePat
return Avro.read(inputFile)
.project(deleteSchema)
.reuseContainers()
- .createReaderFunc(DataReader::create)
+ .createReaderFunc(fileSchema ->
PlannedDataReader.create(deleteSchema))
.build();
case PARQUET:
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index 8399ae5d52..6dac5d5da0 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -387,28 +387,59 @@ public class TestRewriteTablePathsAction extends TestBase
{
}
@Test
- public void testPositionDeletes() throws Exception {
+ public void testPositionDeletesParquet() throws Exception {
+ runPositionDeletesTest("parquet");
+ }
+
+ @Test
+ public void testPositionDeletesAvro() throws Exception {
+ runPositionDeletesTest("avro");
+ }
+
+ @Test
+ public void testPositionDeletesOrc() throws Exception {
+ runPositionDeletesTest("orc");
+ }
+
+ private void runPositionDeletesTest(String fileFormat) throws Exception {
+ Table tableWithPosDeletes =
+ createTableWithSnapshots(
+
tableDir.toFile().toURI().toString().concat("tableWithPosDeletes").concat(fileFormat),
+ 2,
+ Map.of(TableProperties.DELETE_DEFAULT_FILE_FORMAT, fileFormat));
+
List<Pair<CharSequence, Long>> deletes =
Lists.newArrayList(
Pair.of(
-
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(),
+ tableWithPosDeletes
+ .currentSnapshot()
+ .addedDataFiles(tableWithPosDeletes.io())
+ .iterator()
+ .next()
+ .location(),
0L));
- File file = new File(removePrefix(table.location() +
"/data/deeply/nested/deletes.parquet"));
+ File file =
+ new File(
+ removePrefix(
+ tableWithPosDeletes.location() +
"/data/deeply/nested/deletes." + fileFormat));
DeleteFile positionDeletes =
FileHelpers.writeDeleteFile(
- table, table.io().newOutputFile(file.toURI().toString()),
deletes)
+ tableWithPosDeletes,
+
tableWithPosDeletes.io().newOutputFile(file.toURI().toString()),
+ deletes)
.first();
- table.newRowDelta().addDeletes(positionDeletes).commit();
+ tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit();
-
assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1);
+
assertThat(spark.read().format("iceberg").load(tableWithPosDeletes.location()).collectAsList())
+ .hasSize(1);
RewriteTablePath.Result result =
actions()
- .rewriteTablePath(table)
+ .rewriteTablePath(tableWithPosDeletes)
.stagingLocation(stagingLocation())
- .rewriteLocationPrefix(table.location(), targetTableLocation())
+ .rewriteLocationPrefix(tableWithPosDeletes.location(),
targetTableLocation())
.execute();
// We have one more snapshot, an additional manifest list, and a new
(delete) manifest,
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
index fc0dc298ae..d6a13bcd51 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
@@ -51,8 +51,8 @@ import org.apache.iceberg.actions.ImmutableRewriteTablePath;
import org.apache.iceberg.actions.RewriteTablePath;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.avro.PlannedDataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
@@ -725,7 +725,7 @@ public class RewriteTablePathSparkAction extends
BaseSparkAction<RewriteTablePat
return Avro.read(inputFile)
.project(deleteSchema)
.reuseContainers()
- .createReaderFunc(DataReader::create)
+ .createReaderFunc(fileSchema ->
PlannedDataReader.create(deleteSchema))
.build();
case PARQUET:
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index 8399ae5d52..6dac5d5da0 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -387,28 +387,59 @@ public class TestRewriteTablePathsAction extends TestBase
{
}
@Test
- public void testPositionDeletes() throws Exception {
+ public void testPositionDeletesParquet() throws Exception {
+ runPositionDeletesTest("parquet");
+ }
+
+ @Test
+ public void testPositionDeletesAvro() throws Exception {
+ runPositionDeletesTest("avro");
+ }
+
+ @Test
+ public void testPositionDeletesOrc() throws Exception {
+ runPositionDeletesTest("orc");
+ }
+
+ private void runPositionDeletesTest(String fileFormat) throws Exception {
+ Table tableWithPosDeletes =
+ createTableWithSnapshots(
+
tableDir.toFile().toURI().toString().concat("tableWithPosDeletes").concat(fileFormat),
+ 2,
+ Map.of(TableProperties.DELETE_DEFAULT_FILE_FORMAT, fileFormat));
+
List<Pair<CharSequence, Long>> deletes =
Lists.newArrayList(
Pair.of(
-
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(),
+ tableWithPosDeletes
+ .currentSnapshot()
+ .addedDataFiles(tableWithPosDeletes.io())
+ .iterator()
+ .next()
+ .location(),
0L));
- File file = new File(removePrefix(table.location() +
"/data/deeply/nested/deletes.parquet"));
+ File file =
+ new File(
+ removePrefix(
+ tableWithPosDeletes.location() +
"/data/deeply/nested/deletes." + fileFormat));
DeleteFile positionDeletes =
FileHelpers.writeDeleteFile(
- table, table.io().newOutputFile(file.toURI().toString()),
deletes)
+ tableWithPosDeletes,
+
tableWithPosDeletes.io().newOutputFile(file.toURI().toString()),
+ deletes)
.first();
- table.newRowDelta().addDeletes(positionDeletes).commit();
+ tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit();
-
assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1);
+
assertThat(spark.read().format("iceberg").load(tableWithPosDeletes.location()).collectAsList())
+ .hasSize(1);
RewriteTablePath.Result result =
actions()
- .rewriteTablePath(table)
+ .rewriteTablePath(tableWithPosDeletes)
.stagingLocation(stagingLocation())
- .rewriteLocationPrefix(table.location(), targetTableLocation())
+ .rewriteLocationPrefix(tableWithPosDeletes.location(),
targetTableLocation())
.execute();
// We have one more snapshot, an additional manifest list, and a new
(delete) manifest,
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
index fc0dc298ae..d6a13bcd51 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
@@ -51,8 +51,8 @@ import org.apache.iceberg.actions.ImmutableRewriteTablePath;
import org.apache.iceberg.actions.RewriteTablePath;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.avro.PlannedDataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
@@ -725,7 +725,7 @@ public class RewriteTablePathSparkAction extends
BaseSparkAction<RewriteTablePat
return Avro.read(inputFile)
.project(deleteSchema)
.reuseContainers()
- .createReaderFunc(DataReader::create)
+ .createReaderFunc(fileSchema ->
PlannedDataReader.create(deleteSchema))
.build();
case PARQUET:
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
index 8399ae5d52..6dac5d5da0 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
@@ -387,28 +387,59 @@ public class TestRewriteTablePathsAction extends TestBase
{
}
@Test
- public void testPositionDeletes() throws Exception {
+ public void testPositionDeletesParquet() throws Exception {
+ runPositionDeletesTest("parquet");
+ }
+
+ @Test
+ public void testPositionDeletesAvro() throws Exception {
+ runPositionDeletesTest("avro");
+ }
+
+ @Test
+ public void testPositionDeletesOrc() throws Exception {
+ runPositionDeletesTest("orc");
+ }
+
+ private void runPositionDeletesTest(String fileFormat) throws Exception {
+ Table tableWithPosDeletes =
+ createTableWithSnapshots(
+
tableDir.toFile().toURI().toString().concat("tableWithPosDeletes").concat(fileFormat),
+ 2,
+ Map.of(TableProperties.DELETE_DEFAULT_FILE_FORMAT, fileFormat));
+
List<Pair<CharSequence, Long>> deletes =
Lists.newArrayList(
Pair.of(
-
table.currentSnapshot().addedDataFiles(table.io()).iterator().next().location(),
+ tableWithPosDeletes
+ .currentSnapshot()
+ .addedDataFiles(tableWithPosDeletes.io())
+ .iterator()
+ .next()
+ .location(),
0L));
- File file = new File(removePrefix(table.location() +
"/data/deeply/nested/deletes.parquet"));
+ File file =
+ new File(
+ removePrefix(
+ tableWithPosDeletes.location() +
"/data/deeply/nested/deletes." + fileFormat));
DeleteFile positionDeletes =
FileHelpers.writeDeleteFile(
- table, table.io().newOutputFile(file.toURI().toString()),
deletes)
+ tableWithPosDeletes,
+
tableWithPosDeletes.io().newOutputFile(file.toURI().toString()),
+ deletes)
.first();
- table.newRowDelta().addDeletes(positionDeletes).commit();
+ tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit();
-
assertThat(spark.read().format("iceberg").load(table.location()).collectAsList()).hasSize(1);
+
assertThat(spark.read().format("iceberg").load(tableWithPosDeletes.location()).collectAsList())
+ .hasSize(1);
RewriteTablePath.Result result =
actions()
- .rewriteTablePath(table)
+ .rewriteTablePath(tableWithPosDeletes)
.stagingLocation(stagingLocation())
- .rewriteLocationPrefix(table.location(), targetTableLocation())
+ .rewriteLocationPrefix(tableWithPosDeletes.location(),
targetTableLocation())
.execute();
// We have one more snapshot, an additional manifest list, and a new
(delete) manifest,