This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new e34ec24849 Core, Flink, Spark: Further deprecations for the positional
deletes with row data (#14210)
e34ec24849 is described below
commit e34ec248495cdee6259c35c2f44807b2d910dc4e
Author: pvary <[email protected]>
AuthorDate: Thu Oct 16 07:11:33 2025 +0200
Core, Flink, Spark: Further deprecations for the positional deletes with
row data (#14210)
---
.../apache/iceberg/data/BaseFileWriterFactory.java | 34 ++++++++++++++
.../iceberg/data/GenericAppenderFactory.java | 43 +++++++++++++++++
.../iceberg/data/GenericFileWriterFactory.java | 43 +++++++++++++++--
.../iceberg/data/TestGenericFileWriterFactory.java | 9 +++-
.../org/apache/iceberg/io/TestWriterMetrics.java | 54 +++++++++-------------
.../iceberg/flink/sink/FlinkAppenderFactory.java | 17 +++++++
.../iceberg/flink/sink/FlinkFileWriterFactory.java | 51 +-------------------
.../flink/sink/TestFlinkFileWriterFactory.java | 9 +++-
.../flink/sink/TestFlinkPartitioningWriters.java | 1 -
.../flink/sink/TestFlinkPositionDeltaWriters.java | 1 -
.../flink/sink/TestFlinkRollingFileWriters.java | 1 -
.../iceberg/flink/sink/TestFlinkWriterMetrics.java | 18 ++++++--
.../flink/source/TestFlinkMergingMetrics.java | 1 -
.../iceberg/flink/sink/FlinkAppenderFactory.java | 17 +++++++
.../iceberg/flink/sink/FlinkFileWriterFactory.java | 51 +-------------------
.../flink/sink/TestFlinkFileWriterFactory.java | 9 +++-
.../flink/sink/TestFlinkPartitioningWriters.java | 1 -
.../flink/sink/TestFlinkPositionDeltaWriters.java | 1 -
.../flink/sink/TestFlinkRollingFileWriters.java | 1 -
.../iceberg/flink/sink/TestFlinkWriterMetrics.java | 18 ++++++--
.../flink/source/TestFlinkMergingMetrics.java | 1 -
.../iceberg/flink/sink/FlinkAppenderFactory.java | 17 +++++++
.../iceberg/flink/sink/FlinkFileWriterFactory.java | 51 +-------------------
.../flink/sink/TestFlinkFileWriterFactory.java | 9 +++-
.../flink/sink/TestFlinkPartitioningWriters.java | 1 -
.../flink/sink/TestFlinkPositionDeltaWriters.java | 1 -
.../flink/sink/TestFlinkRollingFileWriters.java | 1 -
.../iceberg/flink/sink/TestFlinkWriterMetrics.java | 18 ++++++--
.../flink/source/TestFlinkMergingMetrics.java | 1 -
.../spark/source/SparkFileWriterFactory.java | 47 +++++++++++++++++++
.../spark/source/SparkPositionDeltaWrite.java | 5 +-
.../spark/source/SparkFileWriterFactory.java | 47 +++++++++++++++++++
.../spark/source/SparkPositionDeltaWrite.java | 5 +-
.../spark/source/SparkFileWriterFactory.java | 47 +++++++++++++++++++
.../spark/source/SparkPositionDeltaWrite.java | 5 +-
35 files changed, 422 insertions(+), 214 deletions(-)
diff --git
a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
index 64aafbc850..818fa0b784 100644
--- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
+++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
@@ -53,6 +53,35 @@ public abstract class BaseFileWriterFactory<T> implements
FileWriterFactory<T>,
private final Schema positionDeleteRowSchema;
private final Map<String, String> writerProperties;
+ protected BaseFileWriterFactory(
+ Table table,
+ FileFormat dataFileFormat,
+ Schema dataSchema,
+ SortOrder dataSortOrder,
+ FileFormat deleteFileFormat,
+ int[] equalityFieldIds,
+ Schema equalityDeleteRowSchema,
+ SortOrder equalityDeleteSortOrder,
+ Map<String, String> writerProperties) {
+ this.table = table;
+ this.dataFileFormat = dataFileFormat;
+ this.dataSchema = dataSchema;
+ this.dataSortOrder = dataSortOrder;
+ this.deleteFileFormat = deleteFileFormat;
+ this.equalityFieldIds = equalityFieldIds;
+ this.equalityDeleteRowSchema = equalityDeleteRowSchema;
+ this.equalityDeleteSortOrder = equalityDeleteSortOrder;
+ this.writerProperties = writerProperties;
+ this.positionDeleteRowSchema = null;
+ }
+
+ /**
+ * @deprecated This constructor is deprecated as of version 1.11.0 and will
be removed in 1.12.0.
+ * Position deletes that include row data are no longer supported. Use
{@link
+ * #BaseFileWriterFactory(Table, FileFormat, Schema, SortOrder,
FileFormat, int[], Schema,
+ * SortOrder, Map)} instead.
+ */
+ @Deprecated
protected BaseFileWriterFactory(
Table table,
FileFormat dataFileFormat,
@@ -333,6 +362,11 @@ public abstract class BaseFileWriterFactory<T> implements
FileWriterFactory<T>,
return equalityDeleteRowSchema;
}
+ /**
+ * @deprecated This method is deprecated as of version 1.11.0 and will be
removed in 1.12.0.
+ * Position deletes that include row data are no longer supported.
+ */
+ @Deprecated
protected Schema positionDeleteRowSchema() {
return positionDeleteRowSchema;
}
diff --git
a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java
b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java
index 51b52c7dcb..6c32f0f545 100644
--- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java
+++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java
@@ -62,6 +62,12 @@ public class GenericAppenderFactory implements
FileAppenderFactory<Record> {
this(schema, spec, null, null, null);
}
+ /**
+ * @deprecated This constructor is deprecated as of version 1.11.0 and will
be removed in 1.12.0.
+ * Position deletes that include row data are no longer supported. Use
{@link
+ * #GenericAppenderFactory(Schema, PartitionSpec, int[], Schema)}
instead.
+ */
+ @Deprecated
public GenericAppenderFactory(
Schema schema,
PartitionSpec spec,
@@ -71,6 +77,39 @@ public class GenericAppenderFactory implements
FileAppenderFactory<Record> {
this(null, schema, spec, null, equalityFieldIds, eqDeleteRowSchema,
posDeleteRowSchema);
}
+ /**
+ * Constructor for GenericAppenderFactory.
+ *
+ * @param schema the schema of the records to write
+ * @param spec the partition spec of the records
+ * @param equalityFieldIds the field ids for equality delete
+ * @param eqDeleteRowSchema the schema for equality delete rows
+ */
+ public GenericAppenderFactory(
+ Schema schema, PartitionSpec spec, int[] equalityFieldIds, Schema
eqDeleteRowSchema) {
+ this(null, schema, spec, null, equalityFieldIds, eqDeleteRowSchema, null);
+ }
+
+ /**
+ * Constructor for GenericAppenderFactory.
+ *
+ * @param table iceberg table
+ * @param schema the schema of the records to write
+ * @param spec the partition spec of the records
+ * @param config the configuration for the writer
+ * @param equalityFieldIds the field ids for equality delete
+ * @param eqDeleteRowSchema the schema for equality delete rows
+ */
+ public GenericAppenderFactory(
+ Table table,
+ Schema schema,
+ PartitionSpec spec,
+ Map<String, String> config,
+ int[] equalityFieldIds,
+ Schema eqDeleteRowSchema) {
+ this(table, schema, spec, config, equalityFieldIds, eqDeleteRowSchema,
null);
+ }
+
/**
* Constructor for GenericAppenderFactory.
*
@@ -81,7 +120,11 @@ public class GenericAppenderFactory implements
FileAppenderFactory<Record> {
* @param equalityFieldIds the field ids for equality delete
* @param eqDeleteRowSchema the schema for equality delete rows
* @param posDeleteRowSchema the schema for position delete rows
+ * @deprecated This constructor is deprecated as of version 1.11.0 and will
be removed in 1.12.0.
+ * Position deletes that include row data are no longer supported. Use
{@link
+ * #GenericAppenderFactory(Table, Schema, PartitionSpec, Map, int[],
Schema)} instead.
*/
+ @Deprecated
public GenericAppenderFactory(
Table table,
Schema schema,
diff --git
a/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
b/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
index 58ea9bafd6..a9c39bfd85 100644
--- a/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
+++ b/data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
@@ -38,6 +38,34 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
+ GenericFileWriterFactory(
+ Table table,
+ FileFormat dataFileFormat,
+ Schema dataSchema,
+ SortOrder dataSortOrder,
+ FileFormat deleteFileFormat,
+ int[] equalityFieldIds,
+ Schema equalityDeleteRowSchema,
+ SortOrder equalityDeleteSortOrder) {
+ super(
+ table,
+ dataFileFormat,
+ dataSchema,
+ dataSortOrder,
+ deleteFileFormat,
+ equalityFieldIds,
+ equalityDeleteRowSchema,
+ equalityDeleteSortOrder,
+ ImmutableMap.of());
+ }
+
+ /**
+ * @deprecated This constructor is deprecated as of version 1.11.0 and will
be removed in 1.12.0.
+ * Position deletes that include row data are no longer supported. Use
{@link
+ * #GenericFileWriterFactory(Table, FileFormat, Schema, SortOrder,
FileFormat, int[], Schema,
+ * SortOrder)} instead.
+ */
+ @Deprecated
GenericFileWriterFactory(
Table table,
FileFormat dataFileFormat,
@@ -199,14 +227,19 @@ public class GenericFileWriterFactory extends
BaseFileWriterFactory<Record> {
return this;
}
- /** Sets default writer properties. */
- public Builder writerProperties(Map<String, String> newWriterProperties) {
- this.writerProperties = newWriterProperties;
+ /**
+ * @deprecated This method is deprecated as of version 1.11.0 and will be
removed in 1.12.0.
+ * Position deletes that include row data are no longer supported.
+ */
+ @Deprecated
+ Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
+ this.positionDeleteRowSchema = newPositionDeleteRowSchema;
return this;
}
- Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
- this.positionDeleteRowSchema = newPositionDeleteRowSchema;
+ /** Sets default writer properties. */
+ public Builder writerProperties(Map<String, String> newWriterProperties) {
+ this.writerProperties = newWriterProperties;
return this;
}
diff --git
a/data/src/test/java/org/apache/iceberg/data/TestGenericFileWriterFactory.java
b/data/src/test/java/org/apache/iceberg/data/TestGenericFileWriterFactory.java
index 2ab908c8c7..fc8d420742 100644
---
a/data/src/test/java/org/apache/iceberg/data/TestGenericFileWriterFactory.java
+++
b/data/src/test/java/org/apache/iceberg/data/TestGenericFileWriterFactory.java
@@ -18,12 +18,15 @@
*/
package org.apache.iceberg.data;
+import java.io.IOException;
import java.util.List;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.TestFileWriterFactory;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.StructLikeSet;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
public class TestGenericFileWriterFactory extends
TestFileWriterFactory<Record> {
@@ -39,7 +42,6 @@ public class TestGenericFileWriterFactory extends
TestFileWriterFactory<Record>
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
- .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
@@ -57,4 +59,9 @@ public class TestGenericFileWriterFactory extends
TestFileWriterFactory<Record>
records.forEach(set::add);
return set;
}
+
+ @Disabled("Position deletes with row data are no longer supported")
+ @Override
+ @TestTemplate
+ public void testPositionDeleteWriterWithRow() throws IOException {}
}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
index 5665fd0171..4722ae9895 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
@@ -169,34 +169,18 @@ public abstract class TestWriterMetrics<T> {
// structField.longValue)
Map<Integer, ByteBuffer> lowerBounds = deleteFile.lowerBounds();
-
assertThat(Conversions.<T>fromByteBuffer(Types.StringType.get(),
lowerBounds.get(pathFieldId)))
.isEqualTo(CharBuffer.wrap("File A"));
assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(),
lowerBounds.get(posFieldId)))
.isEqualTo(1L);
-
- assertThat((int) Conversions.fromByteBuffer(Types.IntegerType.get(),
lowerBounds.get(1)))
- .isEqualTo(3);
- assertThat(lowerBounds).doesNotContainKey(2);
- assertThat(lowerBounds).doesNotContainKey(3);
- assertThat(lowerBounds).doesNotContainKey(4);
- assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(),
lowerBounds.get(5)))
- .isEqualTo(3L);
+ checkRowStatistics(lowerBounds);
Map<Integer, ByteBuffer> upperBounds = deleteFile.upperBounds();
-
assertThat(Conversions.<T>fromByteBuffer(Types.StringType.get(),
upperBounds.get(pathFieldId)))
.isEqualTo(CharBuffer.wrap("File A"));
assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(),
upperBounds.get(posFieldId)))
.isEqualTo(1L);
-
- assertThat((int) Conversions.fromByteBuffer(Types.IntegerType.get(),
upperBounds.get(1)))
- .isEqualTo(3);
- assertThat(upperBounds).doesNotContainKey(2);
- assertThat(upperBounds).doesNotContainKey(3);
- assertThat(upperBounds).doesNotContainKey(4);
- assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(),
upperBounds.get(5)))
- .isEqualTo(3L);
+ checkRowStatistics(upperBounds);
}
@TestTemplate
@@ -222,19 +206,8 @@ public abstract class TestWriterMetrics<T> {
DeleteFile deleteFile = deleteWriter.toDeleteFile();
// should have NO bounds for path and position as the file covers multiple
data paths
- Map<Integer, ByteBuffer> lowerBounds = deleteFile.lowerBounds();
- assertThat(lowerBounds).hasSize(2);
- assertThat((int) Conversions.fromByteBuffer(Types.IntegerType.get(),
lowerBounds.get(1)))
- .isEqualTo(3);
- assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(),
lowerBounds.get(5)))
- .isEqualTo(3L);
-
- Map<Integer, ByteBuffer> upperBounds = deleteFile.upperBounds();
- assertThat(upperBounds).hasSize(2);
- assertThat((int) Conversions.fromByteBuffer(Types.IntegerType.get(),
upperBounds.get(1)))
- .isEqualTo(3);
- assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(),
upperBounds.get(5)))
- .isEqualTo(3L);
+ checkNotExistingRowStatistics(deleteFile.lowerBounds());
+ checkNotExistingRowStatistics(deleteFile.upperBounds());
}
@TestTemplate
@@ -378,4 +351,23 @@ public abstract class TestWriterMetrics<T> {
.isEqualTo(1);
}
}
+
+ protected void checkRowStatistics(Map<Integer, ByteBuffer> bounds) {
+ assertThat(bounds).hasSize(4);
+ assertThat((int) Conversions.fromByteBuffer(Types.IntegerType.get(),
bounds.get(1)))
+ .isEqualTo(3);
+ assertThat(bounds).doesNotContainKey(2);
+ assertThat(bounds).doesNotContainKey(3);
+ assertThat(bounds).doesNotContainKey(4);
+ assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(),
bounds.get(5)))
+ .isEqualTo(3L);
+ }
+
+ protected void checkNotExistingRowStatistics(Map<Integer, ByteBuffer>
bounds) {
+ assertThat(bounds).hasSize(2);
+ assertThat((int) Conversions.fromByteBuffer(Types.IntegerType.get(),
bounds.get(1)))
+ .isEqualTo(3);
+ assertThat((long) Conversions.fromByteBuffer(Types.LongType.get(),
bounds.get(5)))
+ .isEqualTo(3L);
+ }
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
index d21bbec81f..07a068391c 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
@@ -66,6 +66,23 @@ public class FlinkAppenderFactory implements
FileAppenderFactory<RowData>, Seria
private RowType eqDeleteFlinkSchema = null;
private RowType posDeleteFlinkSchema = null;
+ public FlinkAppenderFactory(
+ Table table,
+ Schema schema,
+ RowType flinkSchema,
+ Map<String, String> props,
+ PartitionSpec spec,
+ int[] equalityFieldIds,
+ Schema eqDeleteRowSchema) {
+ this(table, schema, flinkSchema, props, spec, equalityFieldIds,
eqDeleteRowSchema, null);
+ }
+
+ /**
+ * @deprecated This constructor is deprecated as of version 1.11.0 and will
be removed in 1.12.0.
+ * Position deletes that include row data are no longer supported. Use
{@link
+ * #FlinkAppenderFactory(Table, Schema, RowType, Map, PartitionSpec,
int[], Schema)} instead.
+ */
+ @Deprecated
public FlinkAppenderFactory(
Table table,
Schema schema,
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
index 948dd252d5..b3ada41737 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.sink;
-import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
@@ -38,7 +37,6 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.data.FlinkAvroWriter;
import org.apache.iceberg.flink.data.FlinkOrcWriter;
import org.apache.iceberg.flink.data.FlinkParquetWriters;
-import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -47,7 +45,6 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
public class FlinkFileWriterFactory extends BaseFileWriterFactory<RowData>
implements Serializable {
private RowType dataFlinkType;
private RowType equalityDeleteFlinkType;
- private RowType positionDeleteFlinkType;
private FlinkFileWriterFactory(
Table table,
@@ -60,8 +57,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
Schema equalityDeleteRowSchema,
RowType equalityDeleteFlinkType,
SortOrder equalityDeleteSortOrder,
- Schema positionDeleteRowSchema,
- RowType positionDeleteFlinkType,
Map<String, String> writeProperties) {
super(
@@ -73,12 +68,10 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
- positionDeleteRowSchema,
writeProperties);
this.dataFlinkType = dataFlinkType;
this.equalityDeleteFlinkType = equalityDeleteFlinkType;
- this.positionDeleteFlinkType = positionDeleteFlinkType;
}
static Builder builderFor(Table table) {
@@ -96,15 +89,7 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
}
@Override
- protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
- int rowFieldIndex =
positionDeleteFlinkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME);
- if (rowFieldIndex >= 0) {
- // FlinkAvroWriter accepts just the Flink type of the row ignoring the
path and pos
- RowType positionDeleteRowFlinkType =
- (RowType) positionDeleteFlinkType().getTypeAt(rowFieldIndex);
- builder.createWriterFunc(ignored -> new
FlinkAvroWriter(positionDeleteRowFlinkType));
- }
- }
+ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {}
@Override
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
@@ -119,8 +104,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
@Override
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
- builder.createWriterFunc(
- msgType -> FlinkParquetWriters.buildWriter(positionDeleteFlinkType(),
msgType));
builder.transformPaths(path -> StringData.fromString(path.toString()));
}
@@ -138,8 +121,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
@Override
protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
- builder.createWriterFunc(
- (iSchema, typDesc) ->
FlinkOrcWriter.buildWriter(positionDeleteFlinkType(), iSchema));
builder.transformPaths(path -> StringData.fromString(path.toString()));
}
@@ -162,17 +143,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
return equalityDeleteFlinkType;
}
- private RowType positionDeleteFlinkType() {
- if (positionDeleteFlinkType == null) {
- // wrap the optional row schema into the position delete schema that
contains path and
- // position
- Schema positionDeleteSchema =
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema());
- this.positionDeleteFlinkType =
FlinkSchemaUtil.convert(positionDeleteSchema);
- }
-
- return positionDeleteFlinkType;
- }
-
public static class Builder {
private final Table table;
private FileFormat dataFileFormat;
@@ -184,8 +154,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
private Schema equalityDeleteRowSchema;
private RowType equalityDeleteFlinkType;
private SortOrder equalityDeleteSortOrder;
- private Schema positionDeleteRowSchema;
- private RowType positionDeleteFlinkType;
private Map<String, String> writerProperties = ImmutableMap.of();
public Builder(Table table) {
@@ -257,21 +225,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
return this;
}
- public Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
- this.positionDeleteRowSchema = newPositionDeleteRowSchema;
- return this;
- }
-
- /**
- * Sets a Flink type for position deletes.
- *
- * <p>If not set, the value is derived from the provided Iceberg schema.
- */
- public Builder positionDeleteFlinkType(RowType newPositionDeleteFlinkType)
{
- this.positionDeleteFlinkType = newPositionDeleteFlinkType;
- return this;
- }
-
/** Sets default writer properties. */
public Builder writerProperties(Map<String, String> newWriterProperties) {
this.writerProperties = newWriterProperties;
@@ -296,8 +249,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
equalityDeleteRowSchema,
equalityDeleteFlinkType,
equalityDeleteSortOrder,
- positionDeleteRowSchema,
- positionDeleteFlinkType,
writerProperties);
}
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
index 414ee40d13..83997093af 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink;
+import java.io.IOException;
import java.util.List;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
@@ -29,6 +30,8 @@ import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.TestFileWriterFactory;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.StructLikeSet;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
public class TestFlinkFileWriterFactory extends TestFileWriterFactory<RowData>
{
@@ -44,7 +47,6 @@ public class TestFlinkFileWriterFactory extends
TestFileWriterFactory<RowData> {
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
- .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
@@ -63,4 +65,9 @@ public class TestFlinkFileWriterFactory extends
TestFileWriterFactory<RowData> {
}
return set;
}
+
+ @Disabled("Position deletes with row data are no longer supported")
+ @Override
+ @TestTemplate
+ public void testPositionDeleteWriterWithRow() throws IOException {}
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
index 939ed2be7d..ade4f15f72 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
@@ -55,7 +55,6 @@ public class TestFlinkPartitioningWriters<T> extends
TestPartitioningWriters<Row
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
- .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
index 3050752d1c..e4212ffa7d 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
@@ -44,7 +44,6 @@ public class TestFlinkPositionDeltaWriters extends
TestPositionDeltaWriters<RowD
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
- .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
index 03051b69cf..b8f37453e6 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
@@ -40,7 +40,6 @@ public class TestFlinkRollingFileWriters extends
TestRollingFileWriters<RowData>
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
- .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
index e6d64ef2c7..bcfcf5ffa5 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
@@ -18,6 +18,10 @@
*/
package org.apache.iceberg.flink.sink;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
@@ -38,15 +42,13 @@ public class TestFlinkWriterMetrics extends
TestWriterMetrics<RowData> {
.dataSchema(sourceTable.schema())
.dataFileFormat(fileFormat)
.deleteFileFormat(fileFormat)
- .positionDeleteRowSchema(sourceTable.schema())
.build();
}
@Override
protected RowData toRow(Integer id, String data, boolean boolValue, Long
longValue) {
GenericRowData nested = GenericRowData.of(boolValue, longValue);
- GenericRowData row = GenericRowData.of(id, StringData.fromString(data),
nested);
- return row;
+ return GenericRowData.of(id, StringData.fromString(data), nested);
}
@Override
@@ -57,4 +59,14 @@ public class TestFlinkWriterMetrics extends
TestWriterMetrics<RowData> {
}
return row;
}
+
+ @Override
+ protected void checkRowStatistics(Map<Integer, ByteBuffer> bounds) {
+ assertThat(bounds).hasSize(2);
+ }
+
+ @Override
+ protected void checkNotExistingRowStatistics(Map<Integer, ByteBuffer>
bounds) {
+ assertThat(bounds).isNull();
+ }
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
index 59a4c3118c..67fc79213c 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
@@ -55,7 +55,6 @@ public class TestFlinkMergingMetrics extends
TestMergingMetrics<RowData> {
ImmutableMap.of(),
PartitionSpec.unpartitioned(),
null,
- null,
null)
.newAppender(
Files.localOutput(File.createTempFile("junit", null,
tempDir)), fileFormat);
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
index d21bbec81f..07a068391c 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
@@ -66,6 +66,23 @@ public class FlinkAppenderFactory implements
FileAppenderFactory<RowData>, Seria
private RowType eqDeleteFlinkSchema = null;
private RowType posDeleteFlinkSchema = null;
+ public FlinkAppenderFactory(
+ Table table,
+ Schema schema,
+ RowType flinkSchema,
+ Map<String, String> props,
+ PartitionSpec spec,
+ int[] equalityFieldIds,
+ Schema eqDeleteRowSchema) {
+ this(table, schema, flinkSchema, props, spec, equalityFieldIds,
eqDeleteRowSchema, null);
+ }
+
+ /**
+ * @deprecated This constructor is deprecated as of version 1.11.0 and will
be removed in 1.12.0.
+ * Position deletes that include row data are no longer supported. Use
{@link
+ * #FlinkAppenderFactory(Table, Schema, RowType, Map, PartitionSpec,
int[], Schema)} instead.
+ */
+ @Deprecated
public FlinkAppenderFactory(
Table table,
Schema schema,
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
index 948dd252d5..b3ada41737 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.sink;
-import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
@@ -38,7 +37,6 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.data.FlinkAvroWriter;
import org.apache.iceberg.flink.data.FlinkOrcWriter;
import org.apache.iceberg.flink.data.FlinkParquetWriters;
-import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -47,7 +45,6 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
public class FlinkFileWriterFactory extends BaseFileWriterFactory<RowData>
implements Serializable {
private RowType dataFlinkType;
private RowType equalityDeleteFlinkType;
- private RowType positionDeleteFlinkType;
private FlinkFileWriterFactory(
Table table,
@@ -60,8 +57,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
Schema equalityDeleteRowSchema,
RowType equalityDeleteFlinkType,
SortOrder equalityDeleteSortOrder,
- Schema positionDeleteRowSchema,
- RowType positionDeleteFlinkType,
Map<String, String> writeProperties) {
super(
@@ -73,12 +68,10 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
- positionDeleteRowSchema,
writeProperties);
this.dataFlinkType = dataFlinkType;
this.equalityDeleteFlinkType = equalityDeleteFlinkType;
- this.positionDeleteFlinkType = positionDeleteFlinkType;
}
static Builder builderFor(Table table) {
@@ -96,15 +89,7 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
}
@Override
- protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
- int rowFieldIndex =
positionDeleteFlinkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME);
- if (rowFieldIndex >= 0) {
- // FlinkAvroWriter accepts just the Flink type of the row ignoring the
path and pos
- RowType positionDeleteRowFlinkType =
- (RowType) positionDeleteFlinkType().getTypeAt(rowFieldIndex);
- builder.createWriterFunc(ignored -> new
FlinkAvroWriter(positionDeleteRowFlinkType));
- }
- }
+ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {}
@Override
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
@@ -119,8 +104,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
@Override
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
- builder.createWriterFunc(
- msgType -> FlinkParquetWriters.buildWriter(positionDeleteFlinkType(),
msgType));
builder.transformPaths(path -> StringData.fromString(path.toString()));
}
@@ -138,8 +121,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
@Override
protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
- builder.createWriterFunc(
- (iSchema, typDesc) ->
FlinkOrcWriter.buildWriter(positionDeleteFlinkType(), iSchema));
builder.transformPaths(path -> StringData.fromString(path.toString()));
}
@@ -162,17 +143,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
return equalityDeleteFlinkType;
}
- private RowType positionDeleteFlinkType() {
- if (positionDeleteFlinkType == null) {
- // wrap the optional row schema into the position delete schema that
contains path and
- // position
- Schema positionDeleteSchema =
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema());
- this.positionDeleteFlinkType =
FlinkSchemaUtil.convert(positionDeleteSchema);
- }
-
- return positionDeleteFlinkType;
- }
-
public static class Builder {
private final Table table;
private FileFormat dataFileFormat;
@@ -184,8 +154,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
private Schema equalityDeleteRowSchema;
private RowType equalityDeleteFlinkType;
private SortOrder equalityDeleteSortOrder;
- private Schema positionDeleteRowSchema;
- private RowType positionDeleteFlinkType;
private Map<String, String> writerProperties = ImmutableMap.of();
public Builder(Table table) {
@@ -257,21 +225,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
return this;
}
- public Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
- this.positionDeleteRowSchema = newPositionDeleteRowSchema;
- return this;
- }
-
- /**
- * Sets a Flink type for position deletes.
- *
- * <p>If not set, the value is derived from the provided Iceberg schema.
- */
- public Builder positionDeleteFlinkType(RowType newPositionDeleteFlinkType)
{
- this.positionDeleteFlinkType = newPositionDeleteFlinkType;
- return this;
- }
-
/** Sets default writer properties. */
public Builder writerProperties(Map<String, String> newWriterProperties) {
this.writerProperties = newWriterProperties;
@@ -296,8 +249,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
equalityDeleteRowSchema,
equalityDeleteFlinkType,
equalityDeleteSortOrder,
- positionDeleteRowSchema,
- positionDeleteFlinkType,
writerProperties);
}
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
index 414ee40d13..83997093af 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink;
+import java.io.IOException;
import java.util.List;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
@@ -29,6 +30,8 @@ import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.TestFileWriterFactory;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.StructLikeSet;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
public class TestFlinkFileWriterFactory extends TestFileWriterFactory<RowData>
{
@@ -44,7 +47,6 @@ public class TestFlinkFileWriterFactory extends
TestFileWriterFactory<RowData> {
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
- .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
@@ -63,4 +65,9 @@ public class TestFlinkFileWriterFactory extends
TestFileWriterFactory<RowData> {
}
return set;
}
+
+ @Disabled("Position deletes with row data are no longer supported")
+ @Override
+ @TestTemplate
+ public void testPositionDeleteWriterWithRow() throws IOException {}
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
index 939ed2be7d..ade4f15f72 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
@@ -55,7 +55,6 @@ public class TestFlinkPartitioningWriters<T> extends
TestPartitioningWriters<Row
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
- .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
index 3050752d1c..e4212ffa7d 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
@@ -44,7 +44,6 @@ public class TestFlinkPositionDeltaWriters extends
TestPositionDeltaWriters<RowD
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
- .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
index 03051b69cf..b8f37453e6 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
@@ -40,7 +40,6 @@ public class TestFlinkRollingFileWriters extends
TestRollingFileWriters<RowData>
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
- .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
index e6d64ef2c7..bcfcf5ffa5 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
@@ -18,6 +18,10 @@
*/
package org.apache.iceberg.flink.sink;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
@@ -38,15 +42,13 @@ public class TestFlinkWriterMetrics extends
TestWriterMetrics<RowData> {
.dataSchema(sourceTable.schema())
.dataFileFormat(fileFormat)
.deleteFileFormat(fileFormat)
- .positionDeleteRowSchema(sourceTable.schema())
.build();
}
@Override
protected RowData toRow(Integer id, String data, boolean boolValue, Long
longValue) {
GenericRowData nested = GenericRowData.of(boolValue, longValue);
- GenericRowData row = GenericRowData.of(id, StringData.fromString(data),
nested);
- return row;
+ return GenericRowData.of(id, StringData.fromString(data), nested);
}
@Override
@@ -57,4 +59,14 @@ public class TestFlinkWriterMetrics extends
TestWriterMetrics<RowData> {
}
return row;
}
+
+ @Override
+ protected void checkRowStatistics(Map<Integer, ByteBuffer> bounds) {
+ assertThat(bounds).hasSize(2);
+ }
+
+ @Override
+ protected void checkNotExistingRowStatistics(Map<Integer, ByteBuffer>
bounds) {
+ assertThat(bounds).isNull();
+ }
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
index 59a4c3118c..67fc79213c 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
@@ -55,7 +55,6 @@ public class TestFlinkMergingMetrics extends
TestMergingMetrics<RowData> {
ImmutableMap.of(),
PartitionSpec.unpartitioned(),
null,
- null,
null)
.newAppender(
Files.localOutput(File.createTempFile("junit", null,
tempDir)), fileFormat);
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
index d21bbec81f..07a068391c 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java
@@ -66,6 +66,23 @@ public class FlinkAppenderFactory implements
FileAppenderFactory<RowData>, Seria
private RowType eqDeleteFlinkSchema = null;
private RowType posDeleteFlinkSchema = null;
+ public FlinkAppenderFactory(
+ Table table,
+ Schema schema,
+ RowType flinkSchema,
+ Map<String, String> props,
+ PartitionSpec spec,
+ int[] equalityFieldIds,
+ Schema eqDeleteRowSchema) {
+ this(table, schema, flinkSchema, props, spec, equalityFieldIds,
eqDeleteRowSchema, null);
+ }
+
+ /**
+ * @deprecated This constructor is deprecated as of version 1.11.0 and will
be removed in 1.12.0.
+ * Position deletes that include row data are no longer supported. Use
{@link
+ * #FlinkAppenderFactory(Table, Schema, RowType, Map, PartitionSpec,
int[], Schema)} instead.
+ */
+ @Deprecated
public FlinkAppenderFactory(
Table table,
Schema schema,
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
index 948dd252d5..b3ada41737 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.sink;
-import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
@@ -38,7 +37,6 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.data.FlinkAvroWriter;
import org.apache.iceberg.flink.data.FlinkOrcWriter;
import org.apache.iceberg.flink.data.FlinkParquetWriters;
-import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -47,7 +45,6 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
public class FlinkFileWriterFactory extends BaseFileWriterFactory<RowData>
implements Serializable {
private RowType dataFlinkType;
private RowType equalityDeleteFlinkType;
- private RowType positionDeleteFlinkType;
private FlinkFileWriterFactory(
Table table,
@@ -60,8 +57,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
Schema equalityDeleteRowSchema,
RowType equalityDeleteFlinkType,
SortOrder equalityDeleteSortOrder,
- Schema positionDeleteRowSchema,
- RowType positionDeleteFlinkType,
Map<String, String> writeProperties) {
super(
@@ -73,12 +68,10 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
- positionDeleteRowSchema,
writeProperties);
this.dataFlinkType = dataFlinkType;
this.equalityDeleteFlinkType = equalityDeleteFlinkType;
- this.positionDeleteFlinkType = positionDeleteFlinkType;
}
static Builder builderFor(Table table) {
@@ -96,15 +89,7 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
}
@Override
- protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
- int rowFieldIndex =
positionDeleteFlinkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME);
- if (rowFieldIndex >= 0) {
- // FlinkAvroWriter accepts just the Flink type of the row ignoring the
path and pos
- RowType positionDeleteRowFlinkType =
- (RowType) positionDeleteFlinkType().getTypeAt(rowFieldIndex);
- builder.createWriterFunc(ignored -> new
FlinkAvroWriter(positionDeleteRowFlinkType));
- }
- }
+ protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {}
@Override
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
@@ -119,8 +104,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
@Override
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
- builder.createWriterFunc(
- msgType -> FlinkParquetWriters.buildWriter(positionDeleteFlinkType(),
msgType));
builder.transformPaths(path -> StringData.fromString(path.toString()));
}
@@ -138,8 +121,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
@Override
protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
- builder.createWriterFunc(
- (iSchema, typDesc) ->
FlinkOrcWriter.buildWriter(positionDeleteFlinkType(), iSchema));
builder.transformPaths(path -> StringData.fromString(path.toString()));
}
@@ -162,17 +143,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
return equalityDeleteFlinkType;
}
- private RowType positionDeleteFlinkType() {
- if (positionDeleteFlinkType == null) {
- // wrap the optional row schema into the position delete schema that
contains path and
- // position
- Schema positionDeleteSchema =
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema());
- this.positionDeleteFlinkType =
FlinkSchemaUtil.convert(positionDeleteSchema);
- }
-
- return positionDeleteFlinkType;
- }
-
public static class Builder {
private final Table table;
private FileFormat dataFileFormat;
@@ -184,8 +154,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
private Schema equalityDeleteRowSchema;
private RowType equalityDeleteFlinkType;
private SortOrder equalityDeleteSortOrder;
- private Schema positionDeleteRowSchema;
- private RowType positionDeleteFlinkType;
private Map<String, String> writerProperties = ImmutableMap.of();
public Builder(Table table) {
@@ -257,21 +225,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
return this;
}
- public Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
- this.positionDeleteRowSchema = newPositionDeleteRowSchema;
- return this;
- }
-
- /**
- * Sets a Flink type for position deletes.
- *
- * <p>If not set, the value is derived from the provided Iceberg schema.
- */
- public Builder positionDeleteFlinkType(RowType newPositionDeleteFlinkType)
{
- this.positionDeleteFlinkType = newPositionDeleteFlinkType;
- return this;
- }
-
/** Sets default writer properties. */
public Builder writerProperties(Map<String, String> newWriterProperties) {
this.writerProperties = newWriterProperties;
@@ -296,8 +249,6 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
equalityDeleteRowSchema,
equalityDeleteFlinkType,
equalityDeleteSortOrder,
- positionDeleteRowSchema,
- positionDeleteFlinkType,
writerProperties);
}
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
index 414ee40d13..83997093af 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkFileWriterFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink;
+import java.io.IOException;
import java.util.List;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
@@ -29,6 +30,8 @@ import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.TestFileWriterFactory;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.StructLikeSet;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
public class TestFlinkFileWriterFactory extends TestFileWriterFactory<RowData>
{
@@ -44,7 +47,6 @@ public class TestFlinkFileWriterFactory extends
TestFileWriterFactory<RowData> {
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
- .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
@@ -63,4 +65,9 @@ public class TestFlinkFileWriterFactory extends
TestFileWriterFactory<RowData> {
}
return set;
}
+
+ @Disabled("Position deletes with row data are no longer supported")
+ @Override
+ @TestTemplate
+ public void testPositionDeleteWriterWithRow() throws IOException {}
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
index 939ed2be7d..ade4f15f72 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
@@ -55,7 +55,6 @@ public class TestFlinkPartitioningWriters<T> extends
TestPartitioningWriters<Row
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
- .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
index 3050752d1c..e4212ffa7d 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPositionDeltaWriters.java
@@ -44,7 +44,6 @@ public class TestFlinkPositionDeltaWriters extends
TestPositionDeltaWriters<RowD
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
- .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
index 03051b69cf..b8f37453e6 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
@@ -40,7 +40,6 @@ public class TestFlinkRollingFileWriters extends
TestRollingFileWriters<RowData>
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
- .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
index e6d64ef2c7..bcfcf5ffa5 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
@@ -18,6 +18,10 @@
*/
package org.apache.iceberg.flink.sink;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
@@ -38,15 +42,13 @@ public class TestFlinkWriterMetrics extends
TestWriterMetrics<RowData> {
.dataSchema(sourceTable.schema())
.dataFileFormat(fileFormat)
.deleteFileFormat(fileFormat)
- .positionDeleteRowSchema(sourceTable.schema())
.build();
}
@Override
protected RowData toRow(Integer id, String data, boolean boolValue, Long
longValue) {
GenericRowData nested = GenericRowData.of(boolValue, longValue);
- GenericRowData row = GenericRowData.of(id, StringData.fromString(data),
nested);
- return row;
+ return GenericRowData.of(id, StringData.fromString(data), nested);
}
@Override
@@ -57,4 +59,14 @@ public class TestFlinkWriterMetrics extends
TestWriterMetrics<RowData> {
}
return row;
}
+
+ @Override
+ protected void checkRowStatistics(Map<Integer, ByteBuffer> bounds) {
+ assertThat(bounds).hasSize(2);
+ }
+
+ @Override
+ protected void checkNotExistingRowStatistics(Map<Integer, ByteBuffer>
bounds) {
+ assertThat(bounds).isNull();
+ }
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
index 59a4c3118c..67fc79213c 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java
@@ -55,7 +55,6 @@ public class TestFlinkMergingMetrics extends
TestMergingMetrics<RowData> {
ImmutableMap.of(),
PartitionSpec.unpartitioned(),
null,
- null,
null)
.newAppender(
Files.localOutput(File.createTempFile("junit", null,
tempDir)), fileFormat);
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index 50a1259c86..a93db17e4a 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -50,6 +50,13 @@ class SparkFileWriterFactory extends
BaseFileWriterFactory<InternalRow> {
private StructType positionDeleteSparkType;
private final Map<String, String> writeProperties;
+ /**
+ * @deprecated This constructor is deprecated as of version 1.11.0 and will
be removed in 1.12.0.
+ * Position deletes that include row data are no longer supported. Use
{@link
+ * #SparkFileWriterFactory(Table, FileFormat, Schema, StructType,
SortOrder, FileFormat,
+ * int[], Schema, StructType, SortOrder, Map)} instead.
+ */
+ @Deprecated
SparkFileWriterFactory(
Table table,
FileFormat dataFileFormat,
@@ -82,6 +89,36 @@ class SparkFileWriterFactory extends
BaseFileWriterFactory<InternalRow> {
this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
}
+ SparkFileWriterFactory(
+ Table table,
+ FileFormat dataFileFormat,
+ Schema dataSchema,
+ StructType dataSparkType,
+ SortOrder dataSortOrder,
+ FileFormat deleteFileFormat,
+ int[] equalityFieldIds,
+ Schema equalityDeleteRowSchema,
+ StructType equalityDeleteSparkType,
+ SortOrder equalityDeleteSortOrder,
+ Map<String, String> writeProperties) {
+
+ super(
+ table,
+ dataFileFormat,
+ dataSchema,
+ dataSortOrder,
+ deleteFileFormat,
+ equalityFieldIds,
+ equalityDeleteRowSchema,
+ equalityDeleteSortOrder,
+ ImmutableMap.of());
+
+ this.dataSparkType = dataSparkType;
+ this.equalityDeleteSparkType = equalityDeleteSparkType;
+ this.positionDeleteSparkType = null;
+ this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
+ }
+
static Builder builderFor(Table table) {
return new Builder(table);
}
@@ -255,11 +292,21 @@ class SparkFileWriterFactory extends
BaseFileWriterFactory<InternalRow> {
return this;
}
+ /**
+ * @deprecated This method is deprecated as of version 1.11.0 and will be
removed in 1.12.0.
+ * Position deletes that include row data are no longer supported.
+ */
+ @Deprecated
Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
this.positionDeleteRowSchema = newPositionDeleteRowSchema;
return this;
}
+ /**
+ * @deprecated This method is deprecated as of version 1.11.0 and will be
removed in 1.12.0.
+ * Position deletes that include row data are no longer supported.
+ */
+ @Deprecated
Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) {
this.positionDeleteSparkType = newPositionDeleteSparkType;
return this;
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 668d2ab05e..f1fd7b7ff9 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -590,7 +590,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
String file = id.getString(fileOrdinal);
long position = id.getLong(positionOrdinal);
- positionDelete.set(file, position, null);
+ positionDelete.set(file, position);
delegate.write(positionDelete, spec, partitionProjection);
}
@@ -822,6 +822,9 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
return targetDataFileSize;
}
+ /* @deprecated This method is deprecated as of version 1.11.0 and will be
removed in 1.12.0.
+ * Position deletes that include row data are no longer supported.
+ */
StructType deleteSparkType() {
return deleteSparkType;
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index 50a1259c86..a93db17e4a 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -50,6 +50,13 @@ class SparkFileWriterFactory extends
BaseFileWriterFactory<InternalRow> {
private StructType positionDeleteSparkType;
private final Map<String, String> writeProperties;
+ /**
+ * @deprecated This constructor is deprecated as of version 1.11.0 and will
be removed in 1.12.0.
+ * Position deletes that include row data are no longer supported. Use
{@link
+ * #SparkFileWriterFactory(Table, FileFormat, Schema, StructType,
SortOrder, FileFormat,
+ * int[], Schema, StructType, SortOrder, Map)} instead.
+ */
+ @Deprecated
SparkFileWriterFactory(
Table table,
FileFormat dataFileFormat,
@@ -82,6 +89,36 @@ class SparkFileWriterFactory extends
BaseFileWriterFactory<InternalRow> {
this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
}
+ SparkFileWriterFactory(
+ Table table,
+ FileFormat dataFileFormat,
+ Schema dataSchema,
+ StructType dataSparkType,
+ SortOrder dataSortOrder,
+ FileFormat deleteFileFormat,
+ int[] equalityFieldIds,
+ Schema equalityDeleteRowSchema,
+ StructType equalityDeleteSparkType,
+ SortOrder equalityDeleteSortOrder,
+ Map<String, String> writeProperties) {
+
+ super(
+ table,
+ dataFileFormat,
+ dataSchema,
+ dataSortOrder,
+ deleteFileFormat,
+ equalityFieldIds,
+ equalityDeleteRowSchema,
+ equalityDeleteSortOrder,
+ ImmutableMap.of());
+
+ this.dataSparkType = dataSparkType;
+ this.equalityDeleteSparkType = equalityDeleteSparkType;
+ this.positionDeleteSparkType = null;
+ this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
+ }
+
static Builder builderFor(Table table) {
return new Builder(table);
}
@@ -255,11 +292,21 @@ class SparkFileWriterFactory extends
BaseFileWriterFactory<InternalRow> {
return this;
}
+ /**
+ * @deprecated This method is deprecated as of version 1.11.0 and will be
removed in 1.12.0.
+ * Position deletes that include row data are no longer supported.
+ */
+ @Deprecated
Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
this.positionDeleteRowSchema = newPositionDeleteRowSchema;
return this;
}
+ /**
+ * @deprecated This method is deprecated as of version 1.11.0 and will be
removed in 1.12.0.
+ * Position deletes that include row data are no longer supported.
+ */
+ @Deprecated
Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) {
this.positionDeleteSparkType = newPositionDeleteSparkType;
return this;
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 39bfe38848..ddad1a749a 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -594,7 +594,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
String file = id.getString(fileOrdinal);
long position = id.getLong(positionOrdinal);
- positionDelete.set(file, position, null);
+ positionDelete.set(file, position);
delegate.write(positionDelete, spec, partitionProjection);
}
@@ -826,6 +826,9 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
return targetDataFileSize;
}
+ /* @deprecated This method is deprecated as of version 1.11.0 and will be
removed in 1.12.0.
+ * Position deletes that include row data are no longer supported.
+ */
StructType deleteSparkType() {
return deleteSparkType;
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index 50a1259c86..a93db17e4a 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -50,6 +50,13 @@ class SparkFileWriterFactory extends
BaseFileWriterFactory<InternalRow> {
private StructType positionDeleteSparkType;
private final Map<String, String> writeProperties;
+ /**
+ * @deprecated This constructor is deprecated as of version 1.11.0 and will
be removed in 1.12.0.
+ * Position deletes that include row data are no longer supported. Use
{@link
+ * #SparkFileWriterFactory(Table, FileFormat, Schema, StructType,
SortOrder, FileFormat,
+ * int[], Schema, StructType, SortOrder, Map)} instead.
+ */
+ @Deprecated
SparkFileWriterFactory(
Table table,
FileFormat dataFileFormat,
@@ -82,6 +89,36 @@ class SparkFileWriterFactory extends
BaseFileWriterFactory<InternalRow> {
this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
}
+ SparkFileWriterFactory(
+ Table table,
+ FileFormat dataFileFormat,
+ Schema dataSchema,
+ StructType dataSparkType,
+ SortOrder dataSortOrder,
+ FileFormat deleteFileFormat,
+ int[] equalityFieldIds,
+ Schema equalityDeleteRowSchema,
+ StructType equalityDeleteSparkType,
+ SortOrder equalityDeleteSortOrder,
+ Map<String, String> writeProperties) {
+
+ super(
+ table,
+ dataFileFormat,
+ dataSchema,
+ dataSortOrder,
+ deleteFileFormat,
+ equalityFieldIds,
+ equalityDeleteRowSchema,
+ equalityDeleteSortOrder,
+ ImmutableMap.of());
+
+ this.dataSparkType = dataSparkType;
+ this.equalityDeleteSparkType = equalityDeleteSparkType;
+ this.positionDeleteSparkType = null;
+ this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
+ }
+
static Builder builderFor(Table table) {
return new Builder(table);
}
@@ -255,11 +292,21 @@ class SparkFileWriterFactory extends
BaseFileWriterFactory<InternalRow> {
return this;
}
+ /**
+ * @deprecated This method is deprecated as of version 1.11.0 and will be
removed in 1.12.0.
+ * Position deletes that include row data are no longer supported.
+ */
+ @Deprecated
Builder positionDeleteRowSchema(Schema newPositionDeleteRowSchema) {
this.positionDeleteRowSchema = newPositionDeleteRowSchema;
return this;
}
+ /**
+ * @deprecated This method is deprecated as of version 1.11.0 and will be
removed in 1.12.0.
+ * Position deletes that include row data are no longer supported.
+ */
+ @Deprecated
Builder positionDeleteSparkType(StructType newPositionDeleteSparkType) {
this.positionDeleteSparkType = newPositionDeleteSparkType;
return this;
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 88ca202cc9..d072397dc6 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -608,7 +608,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
String file = id.getString(fileOrdinal);
long position = id.getLong(positionOrdinal);
- positionDelete.set(file, position, null);
+ positionDelete.set(file, position);
delegate.write(positionDelete, spec, partitionProjection);
}
@@ -883,6 +883,9 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
return targetDataFileSize;
}
+ /* @deprecated This method is deprecated as of version 1.11.0 and will be
removed in 1.12.0.
+ * Position deletes that include row data are no longer supported.
+ */
StructType deleteSparkType() {
return deleteSparkType;
}