This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 345140e9f7 AWS, Core, Data, Spark: Remove deprecations for 1.11.0 
(#14059)
345140e9f7 is described below

commit 345140e9f7d428fcc3b4fcc639a690084a698b76
Author: Doğukan Çağatay <970872+dogukancaga...@users.noreply.github.com>
AuthorDate: Fri Sep 19 00:35:17 2025 +0200

    AWS, Core, Data, Spark: Remove deprecations for 1.11.0 (#14059)
    
    Co-authored-by: Eduard Tudenhoefner <etudenhoef...@gmail.com>
---
 .palantir/revapi.yml                               |  57 ++
 .../java/org/apache/iceberg/aws/s3/S3FileIO.java   |  39 +-
 .../org/apache/iceberg/aws/s3/S3InputFile.java     |  88 ---
 .../org/apache/iceberg/aws/s3/S3OutputFile.java    |  41 --
 .../org/apache/iceberg/PartitionStatsHandler.java  |  19 -
 .../org/apache/iceberg/PartitionStatsUtil.java     | 144 -----
 .../org/apache/iceberg/RewriteTablePathUtil.java   |  91 ----
 .../org/apache/iceberg/TableMetadataParser.java    |   8 -
 .../apache/iceberg/encryption/EncryptionUtil.java  |  12 -
 .../org/apache/iceberg/rest/auth/OAuth2Util.java   |  91 +---
 .../iceberg/PartitionStatsHandlerTestBase.java     |   4 +-
 .../org/apache/iceberg/TestPartitionStatsUtil.java | 588 ---------------------
 .../apache/iceberg/TestRewriteTablePathUtil.java   |  11 +-
 .../iceberg/PartitionStatsHandlerBenchmark.java    |   2 +-
 .../apache/iceberg/data/PartitionStatsHandler.java | 282 ----------
 .../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java |  18 -
 .../actions/TestComputePartitionStatsAction.java   |   4 +-
 .../extensions/TestRewriteDataFilesProcedure.java  |   2 +-
 .../extensions/TestRewriteManifestsProcedure.java  |   2 +-
 .../actions/TestComputePartitionStatsAction.java   |   4 +-
 .../extensions/TestRewriteDataFilesProcedure.java  |   2 +-
 .../extensions/TestRewriteManifestsProcedure.java  |   2 +-
 .../actions/TestComputePartitionStatsAction.java   |   4 +-
 23 files changed, 82 insertions(+), 1433 deletions(-)

diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index 1c871ba30c..8605745ae7 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -1363,6 +1363,63 @@ acceptedBreaks:
       old: "method 
org.apache.iceberg.parquet.ParquetValueWriters.StructWriter<org.apache.iceberg.data.Record>\
         \ 
org.apache.iceberg.data.parquet.GenericParquetWriter::createStructWriter(java.util.List<org.apache.iceberg.parquet.ParquetValueWriter<?>>)"
       justification: "Removing deprecations for 1.10.0"
+  "1.10.0":
+    org.apache.iceberg:iceberg-core:
+      - code: "java.class.removed"
+        old: "class org.apache.iceberg.PartitionStatsUtil"
+        justification: "Removing deprecated code for 1.11.0"
+      - code: "java.method.removed"
+        old: "method java.lang.String 
org.apache.iceberg.RewriteTablePathUtil::stagingPath(java.lang.String,\
+        \ java.lang.String)"
+        justification: "Removing deprecated code for 1.11.0"
+      - code: "java.method.removed"
+        old: "method 
org.apache.iceberg.RewriteTablePathUtil.RewriteResult<org.apache.iceberg.DataFile>\
+        \ 
org.apache.iceberg.RewriteTablePathUtil::rewriteDataManifest(org.apache.iceberg.ManifestFile,\
+        \ org.apache.iceberg.io.OutputFile, org.apache.iceberg.io.FileIO, int, 
java.util.Map<java.lang.Integer,\
+        \ org.apache.iceberg.PartitionSpec>, java.lang.String, 
java.lang.String) throws\
+        \ java.io.IOException"
+        justification: "Removing deprecated code for 1.11.0"
+      - code: "java.method.removed"
+        old: "method 
org.apache.iceberg.RewriteTablePathUtil.RewriteResult<org.apache.iceberg.DeleteFile>\
+        \ 
org.apache.iceberg.RewriteTablePathUtil::rewriteDeleteManifest(org.apache.iceberg.ManifestFile,\
+        \ org.apache.iceberg.io.OutputFile, org.apache.iceberg.io.FileIO, int, 
java.util.Map<java.lang.Integer,\
+        \ org.apache.iceberg.PartitionSpec>, java.lang.String, 
java.lang.String, java.lang.String)\
+        \ throws java.io.IOException"
+        justification: "Removing deprecated code for 1.11.0"
+      - code: "java.method.removed"
+        old: "method org.apache.iceberg.Schema 
org.apache.iceberg.PartitionStatsHandler::schema(org.apache.iceberg.types.Types.StructType)"
+        justification: "Removing deprecated code for 1.11.0"
+      - code: "java.method.removed"
+        old: "method org.apache.iceberg.TableMetadata 
org.apache.iceberg.TableMetadataParser::read(org.apache.iceberg.io.FileIO,\
+        \ org.apache.iceberg.io.InputFile)"
+        justification: "Removing deprecated code for 1.11.0"
+      - code: "java.method.removed"
+        old: "method org.apache.iceberg.encryption.EncryptionManager 
org.apache.iceberg.encryption.EncryptionUtil::createEncryptionManager(java.util.Map<java.lang.String,\
+        \ java.lang.String>, 
org.apache.iceberg.encryption.KeyManagementClient)"
+        justification: "Removing deprecated code for 1.11.0"
+      - code: "java.method.removed"
+        old: "method org.apache.iceberg.rest.responses.OAuthTokenResponse 
org.apache.iceberg.rest.auth.OAuth2Util::exchangeToken(org.apache.iceberg.rest.RESTClient,\
+        \ java.util.Map<java.lang.String, java.lang.String>, java.lang.String, 
java.lang.String,\
+        \ java.lang.String, java.lang.String, java.lang.String)"
+        justification: "Removing deprecated code for 1.11.0"
+      - code: "java.method.removed"
+        old: "method org.apache.iceberg.rest.responses.OAuthTokenResponse 
org.apache.iceberg.rest.auth.OAuth2Util::exchangeToken(org.apache.iceberg.rest.RESTClient,\
+        \ java.util.Map<java.lang.String, java.lang.String>, java.lang.String, 
java.lang.String,\
+        \ java.lang.String, java.lang.String, java.lang.String, 
java.lang.String)"
+        justification: "Removing deprecated code for 1.11.0"
+      - code: "java.method.removed"
+        old: "method org.apache.iceberg.rest.responses.OAuthTokenResponse 
org.apache.iceberg.rest.auth.OAuth2Util::fetchToken(org.apache.iceberg.rest.RESTClient,\
+        \ java.util.Map<java.lang.String, java.lang.String>, java.lang.String, 
java.lang.String)"
+        justification: "Removing deprecated code for 1.11.0"
+      - code: "java.method.removed"
+        old: "method org.apache.iceberg.rest.responses.OAuthTokenResponse 
org.apache.iceberg.rest.auth.OAuth2Util::fetchToken(org.apache.iceberg.rest.RESTClient,\
+        \ java.util.Map<java.lang.String, java.lang.String>, java.lang.String, 
java.lang.String,\
+        \ java.lang.String)"
+        justification: "Removing deprecated code for 1.11.0"
+    org.apache.iceberg:iceberg-data:
+      - code: "java.class.removed"
+        old: "class org.apache.iceberg.data.PartitionStatsHandler"
+        justification: "Removing deprecated code for 1.11.0"
   apache-iceberg-0.14.0:
     org.apache.iceberg:iceberg-api:
     - code: "java.class.defaultSerializationChanged"
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index 41d45db357..d5e51ed74a 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -122,7 +122,7 @@ public class S3FileIO
    * @param s3 s3 supplier
    */
   public S3FileIO(SerializableSupplier<S3Client> s3) {
-    this(s3, null, new S3FileIOProperties());
+    this(s3, null);
   }
 
   /**
@@ -134,45 +134,10 @@ public class S3FileIO
    * @param s3Async s3Async supplier
    */
   public S3FileIO(SerializableSupplier<S3Client> s3, 
SerializableSupplier<S3AsyncClient> s3Async) {
-    this(s3, s3Async, new S3FileIOProperties());
-  }
-
-  /**
-   * Constructor with custom s3 supplier and S3FileIO properties.
-   *
-   * <p>Calling {@link S3FileIO#initialize(Map)} will overwrite information 
set in this constructor.
-   *
-   * @param s3 s3 supplier
-   * @param s3FileIOProperties S3 FileIO properties
-   * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
-   *     S3FileIO#S3FileIO(SerializableSupplier)} with {@link 
S3FileIO#initialize(Map)} instead
-   */
-  @Deprecated
-  public S3FileIO(SerializableSupplier<S3Client> s3, S3FileIOProperties 
s3FileIOProperties) {
-    this(s3, null, s3FileIOProperties);
-  }
-
-  /**
-   * Constructor with custom s3 supplier, s3Async supplier and S3FileIO 
properties.
-   *
-   * <p>Calling {@link S3FileIO#initialize(Map)} will overwrite information 
set in this constructor.
-   *
-   * @param s3 s3 supplier
-   * @param s3Async s3Async supplier
-   * @param s3FileIOProperties S3 FileIO properties
-   * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
-   *     S3FileIO#S3FileIO(SerializableSupplier, SerializableSupplier)} with 
{@link
-   *     S3FileIO#initialize(Map)} instead
-   */
-  @Deprecated
-  public S3FileIO(
-      SerializableSupplier<S3Client> s3,
-      SerializableSupplier<S3AsyncClient> s3Async,
-      S3FileIOProperties s3FileIOProperties) {
     this.s3 = s3;
     this.s3Async = s3Async;
     this.createStack = Thread.currentThread().getStackTrace();
-    this.properties = SerializableMap.copyOf(s3FileIOProperties.properties());
+    this.properties = SerializableMap.copyOf(Maps.newHashMap());
   }
 
   @Override
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
index 63ce84e2fe..5e4346fe9f 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
@@ -30,94 +30,6 @@ public class S3InputFile extends BaseS3File implements 
InputFile, NativelyEncryp
   private NativeFileCryptoParameters nativeDecryptionParameters;
   private Long length;
 
-  /**
-   * Creates a {@link S3InputFile} from the given parameters.
-   *
-   * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
-   *     S3InputFile#fromLocation(String, PrefixedS3Client, MetricsContext)} 
instead.
-   */
-  @Deprecated
-  public static S3InputFile fromLocation(
-      String location,
-      S3Client client,
-      S3FileIOProperties s3FileIOProperties,
-      MetricsContext metrics) {
-    return new S3InputFile(
-        client,
-        null,
-        new S3URI(location, s3FileIOProperties.bucketToAccessPointMapping()),
-        null,
-        s3FileIOProperties,
-        metrics);
-  }
-
-  /**
-   * Creates a {@link S3InputFile} from the given parameters.
-   *
-   * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
-   *     S3InputFile#fromLocation(String, PrefixedS3Client, MetricsContext)} 
instead.
-   */
-  @Deprecated
-  public static S3InputFile fromLocation(
-      String location,
-      S3Client client,
-      S3AsyncClient asyncClient,
-      S3FileIOProperties s3FileIOProperties,
-      MetricsContext metrics) {
-    return new S3InputFile(
-        client,
-        asyncClient,
-        new S3URI(location, s3FileIOProperties.bucketToAccessPointMapping()),
-        null,
-        s3FileIOProperties,
-        metrics);
-  }
-
-  /**
-   * Creates a {@link S3InputFile} from the given parameters.
-   *
-   * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
-   *     S3InputFile#fromLocation(String, long, PrefixedS3Client, 
MetricsContext)} instead.
-   */
-  @Deprecated
-  public static S3InputFile fromLocation(
-      String location,
-      long length,
-      S3Client client,
-      S3FileIOProperties s3FileIOProperties,
-      MetricsContext metrics) {
-    return new S3InputFile(
-        client,
-        null,
-        new S3URI(location, s3FileIOProperties.bucketToAccessPointMapping()),
-        length > 0 ? length : null,
-        s3FileIOProperties,
-        metrics);
-  }
-
-  /**
-   * Creates a {@link S3InputFile} from the given parameters.
-   *
-   * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
-   *     S3InputFile#fromLocation(String, long, PrefixedS3Client, 
MetricsContext)} instead.
-   */
-  @Deprecated
-  public static S3InputFile fromLocation(
-      String location,
-      long length,
-      S3Client client,
-      S3AsyncClient asyncClient,
-      S3FileIOProperties s3FileIOProperties,
-      MetricsContext metrics) {
-    return new S3InputFile(
-        client,
-        asyncClient,
-        new S3URI(location, s3FileIOProperties.bucketToAccessPointMapping()),
-        length > 0 ? length : null,
-        s3FileIOProperties,
-        metrics);
-  }
-
   static S3InputFile fromLocation(
       String location, PrefixedS3Client client, MetricsContext metrics) {
     return fromLocation(location, 0, client, metrics);
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
index fa12bac311..3fcd3cdbd5 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
@@ -33,47 +33,6 @@ import software.amazon.awssdk.services.s3.S3Client;
 public class S3OutputFile extends BaseS3File implements OutputFile, 
NativelyEncryptedFile {
   private NativeFileCryptoParameters nativeEncryptionParameters;
 
-  /**
-   * Creates a {@link S3OutputFile} from the given parameters.
-   *
-   * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
-   *     S3OutputFile#fromLocation(String, PrefixedS3Client, MetricsContext)} 
instead.
-   */
-  @Deprecated
-  public static S3OutputFile fromLocation(
-      String location,
-      S3Client client,
-      S3FileIOProperties s3FileIOProperties,
-      MetricsContext metrics) {
-    return new S3OutputFile(
-        client,
-        null,
-        new S3URI(location, s3FileIOProperties.bucketToAccessPointMapping()),
-        s3FileIOProperties,
-        metrics);
-  }
-
-  /**
-   * Creates a {@link S3OutputFile} from the given parameters.
-   *
-   * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
-   *     S3OutputFile#fromLocation(String, PrefixedS3Client, MetricsContext)} 
instead.
-   */
-  @Deprecated
-  public static S3OutputFile fromLocation(
-      String location,
-      S3Client client,
-      S3AsyncClient asyncClient,
-      S3FileIOProperties s3FileIOProperties,
-      MetricsContext metrics) {
-    return new S3OutputFile(
-        client,
-        asyncClient,
-        new S3URI(location, s3FileIOProperties.bucketToAccessPointMapping()),
-        s3FileIOProperties,
-        metrics);
-  }
-
   static S3OutputFile fromLocation(
       String location, PrefixedS3Client client, MetricsContext metrics) {
     return new S3OutputFile(
diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java 
b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
index 6d2fc96c08..4e7c1b104e 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java
@@ -100,25 +100,6 @@ public class PartitionStatsHandler {
           .withWriteDefault(Literal.of(0))
           .build();
 
-  /**
-   * Generates the partition stats file schema based on a combined partition 
type which considers
-   * all specs in a table.
-   *
-   * <p>Use this only for format version 1 and 2. For version 3 and above use 
{@link
-   * #schema(StructType, int)}
-   *
-   * @param unifiedPartitionType unified partition schema type. Could be 
calculated by {@link
-   *     Partitioning#partitionType(Table)}.
-   * @return a schema that corresponds to the provided unified partition type.
-   * @deprecated since 1.10.0, will be removed in 1.11.0. Use {@link 
#schema(StructType, int)}
-   *     instead.
-   */
-  @Deprecated
-  public static Schema schema(StructType unifiedPartitionType) {
-    Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table 
must be partitioned");
-    return v2Schema(unifiedPartitionType);
-  }
-
   /**
    * Generates the partition stats file schema for a given format version 
based on a combined
    * partition type which considers all specs in a table.
diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java 
b/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java
deleted file mode 100644
index ceb0fb9005..0000000000
--- a/core/src/main/java/org/apache/iceberg/PartitionStatsUtil.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Queues;
-import org.apache.iceberg.types.Comparators;
-import org.apache.iceberg.types.Types.StructType;
-import org.apache.iceberg.util.PartitionMap;
-import org.apache.iceberg.util.PartitionUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.iceberg.util.ThreadPools;
-
-/**
- * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
- *     org.apache.iceberg.PartitionStatsHandler} directly
- */
-@Deprecated
-public class PartitionStatsUtil {
-
-  private PartitionStatsUtil() {}
-
-  /**
-   * Computes the partition stats for the given snapshot of the table.
-   *
-   * @param table the table for which partition stats to be computed.
-   * @param snapshot the snapshot for which partition stats is computed.
-   * @return the collection of {@link PartitionStats}
-   */
-  public static Collection<PartitionStats> computeStats(Table table, Snapshot 
snapshot) {
-    Preconditions.checkArgument(table != null, "table cannot be null");
-    Preconditions.checkArgument(Partitioning.isPartitioned(table), "table must 
be partitioned");
-    Preconditions.checkArgument(snapshot != null, "snapshot cannot be null");
-
-    StructType partitionType = Partitioning.partitionType(table);
-    List<ManifestFile> manifests = snapshot.allManifests(table.io());
-    Queue<PartitionMap<PartitionStats>> statsByManifest = 
Queues.newConcurrentLinkedQueue();
-    Tasks.foreach(manifests)
-        .stopOnFailure()
-        .throwFailureWhenFinished()
-        .executeWith(ThreadPools.getWorkerPool())
-        .run(manifest -> statsByManifest.add(collectStats(table, manifest, 
partitionType)));
-
-    return mergeStats(statsByManifest, table.specs());
-  }
-
-  /**
-   * Sorts the {@link PartitionStats} based on the partition data.
-   *
-   * @param stats collection of {@link PartitionStats} which needs to be 
sorted.
-   * @param partitionType unified partition schema.
-   * @return the list of {@link PartitionStats}
-   */
-  public static List<PartitionStats> sortStats(
-      Collection<PartitionStats> stats, StructType partitionType) {
-    List<PartitionStats> entries = Lists.newArrayList(stats);
-    entries.sort(partitionStatsCmp(partitionType));
-    return entries;
-  }
-
-  private static Comparator<PartitionStats> partitionStatsCmp(StructType 
partitionType) {
-    return Comparator.comparing(PartitionStats::partition, 
Comparators.forType(partitionType));
-  }
-
-  private static PartitionMap<PartitionStats> collectStats(
-      Table table, ManifestFile manifest, StructType partitionType) {
-    try (ManifestReader<?> reader = openManifest(table, manifest)) {
-      PartitionMap<PartitionStats> statsMap = 
PartitionMap.create(table.specs());
-      int specId = manifest.partitionSpecId();
-      PartitionSpec spec = table.specs().get(specId);
-      PartitionData keyTemplate = new PartitionData(partitionType);
-
-      for (ManifestEntry<?> entry : reader.entries()) {
-        ContentFile<?> file = entry.file();
-        StructLike coercedPartition =
-            PartitionUtil.coercePartition(partitionType, spec, 
file.partition());
-        StructLike key = keyTemplate.copyFor(coercedPartition);
-        Snapshot snapshot = table.snapshot(entry.snapshotId());
-        PartitionStats stats =
-            statsMap.computeIfAbsent(
-                specId,
-                ((PartitionData) file.partition()).copy(),
-                () -> new PartitionStats(key, specId));
-        if (entry.isLive()) {
-          stats.liveEntry(file, snapshot);
-        } else {
-          stats.deletedEntry(snapshot);
-        }
-      }
-
-      return statsMap;
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
-  }
-
-  private static ManifestReader<?> openManifest(Table table, ManifestFile 
manifest) {
-    List<String> projection = BaseScan.scanColumns(manifest.content());
-    return ManifestFiles.open(manifest, table.io()).select(projection);
-  }
-
-  private static Collection<PartitionStats> mergeStats(
-      Queue<PartitionMap<PartitionStats>> statsByManifest, Map<Integer, 
PartitionSpec> specs) {
-    PartitionMap<PartitionStats> statsMap = PartitionMap.create(specs);
-
-    for (PartitionMap<PartitionStats> stats : statsByManifest) {
-      stats.forEach(
-          (key, value) ->
-              statsMap.merge(
-                  key,
-                  value,
-                  (existingEntry, newEntry) -> {
-                    existingEntry.appendStats(newEntry);
-                    return existingEntry;
-                  }));
-    }
-
-    return statsMap.values();
-  }
-}
diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java 
b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
index e947135124..133a156af0 100644
--- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
+++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
@@ -312,42 +312,6 @@ public class RewriteTablePathUtil {
     return manifestFiles;
   }
 
-  /**
-   * Rewrite a data manifest, replacing path references.
-   *
-   * @param manifestFile source manifest file to rewrite
-   * @param outputFile output file to rewrite manifest file to
-   * @param io file io
-   * @param format format of the manifest file
-   * @param specsById map of partition specs by id
-   * @param sourcePrefix source prefix that will be replaced
-   * @param targetPrefix target prefix that will replace it
-   * @return a copy plan of content files in the manifest that was rewritten
-   * @deprecated since 1.10.0, will be removed in 1.11.0
-   */
-  @Deprecated
-  public static RewriteResult<DataFile> rewriteDataManifest(
-      ManifestFile manifestFile,
-      OutputFile outputFile,
-      FileIO io,
-      int format,
-      Map<Integer, PartitionSpec> specsById,
-      String sourcePrefix,
-      String targetPrefix)
-      throws IOException {
-    PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
-    try (ManifestWriter<DataFile> writer =
-            ManifestFiles.write(format, spec, outputFile, 
manifestFile.snapshotId());
-        ManifestReader<DataFile> reader =
-            ManifestFiles.read(manifestFile, io, 
specsById).select(Arrays.asList("*"))) {
-      return StreamSupport.stream(reader.entries().spliterator(), false)
-          .map(
-              entry ->
-                  writeDataFileEntry(entry, Set.of(), spec, sourcePrefix, 
targetPrefix, writer))
-          .reduce(new RewriteResult<>(), RewriteResult::append);
-    }
-  }
-
   /**
    * Rewrite a data manifest, replacing path references.
    *
@@ -384,47 +348,6 @@ public class RewriteTablePathUtil {
     }
   }
 
-  /**
-   * Rewrite a delete manifest, replacing path references.
-   *
-   * @param manifestFile source delete manifest to rewrite
-   * @param outputFile output file to rewrite manifest file to
-   * @param io file io
-   * @param format format of the manifest file
-   * @param specsById map of partition specs by id
-   * @param sourcePrefix source prefix that will be replaced
-   * @param targetPrefix target prefix that will replace it
-   * @param stagingLocation staging location for rewritten files (referred 
delete file will be
-   *     rewritten here)
-   * @return a copy plan of content files in the manifest that was rewritten
-   * @deprecated since 1.10.0, will be removed in 1.11.0
-   */
-  @Deprecated
-  public static RewriteResult<DeleteFile> rewriteDeleteManifest(
-      ManifestFile manifestFile,
-      OutputFile outputFile,
-      FileIO io,
-      int format,
-      Map<Integer, PartitionSpec> specsById,
-      String sourcePrefix,
-      String targetPrefix,
-      String stagingLocation)
-      throws IOException {
-    PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
-    try (ManifestWriter<DeleteFile> writer =
-            ManifestFiles.writeDeleteManifest(format, spec, outputFile, 
manifestFile.snapshotId());
-        ManifestReader<DeleteFile> reader =
-            ManifestFiles.readDeleteManifest(manifestFile, io, specsById)
-                .select(Arrays.asList("*"))) {
-      return StreamSupport.stream(reader.entries().spliterator(), false)
-          .map(
-              entry ->
-                  writeDeleteFileEntry(
-                      entry, Set.of(), spec, sourcePrefix, targetPrefix, 
stagingLocation, writer))
-          .reduce(new RewriteResult<>(), RewriteResult::append);
-    }
-  }
-
   /**
    * Rewrite a delete manifest, replacing path references.
    *
@@ -717,20 +640,6 @@ public class RewriteTablePathUtil {
     return path.endsWith(FILE_SEPARATOR) ? path : path + FILE_SEPARATOR;
   }
 
-  /**
-   * Construct a staging path under a given staging directory
-   *
-   * @param originalPath source path
-   * @param stagingDir staging directory
-   * @return a staging path under the staging directory, based on the original 
path
-   * @deprecated since 1.10.0, will be removed in 1.11.0. Use {@link 
#stagingPath(String, String,
-   *     String)} instead to avoid filename conflicts
-   */
-  @Deprecated
-  public static String stagingPath(String originalPath, String stagingDir) {
-    return stagingDir + fileName(originalPath);
-  }
-
   /**
    * Construct a staging path under a given staging directory, preserving 
relative directory
    * structure to avoid conflicts when multiple files have the same name but 
different paths.
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java 
b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 396396f41b..eeeeeab8a6 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -294,14 +294,6 @@ public class TableMetadataParser {
     return read(io.newInputFile(path));
   }
 
-  /**
-   * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link 
#read(InputFile)} instead.
-   */
-  @Deprecated
-  public static TableMetadata read(FileIO io, InputFile file) {
-    return read(file);
-  }
-
   public static TableMetadata read(InputFile file) {
     Codec codec = Codec.fromFileName(file.location());
     try (InputStream is =
diff --git 
a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java 
b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java
index 922cac455d..36efde6299 100644
--- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java
+++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java
@@ -71,18 +71,6 @@ public class EncryptionUtil {
     return kmsClient;
   }
 
-  /**
-   * Create a standard encryption manager.
-   *
-   * @deprecated will be removed in 1.11.0; use {@link 
#createEncryptionManager(List, Map,
-   *     KeyManagementClient)} instead.
-   */
-  @Deprecated
-  public static EncryptionManager createEncryptionManager(
-      Map<String, String> tableProperties, KeyManagementClient kmsClient) {
-    return createEncryptionManager(List.of(), tableProperties, kmsClient);
-  }
-
   static EncryptionManager createEncryptionManager(
       List<EncryptedKey> keys, Map<String, String> tableProperties, 
KeyManagementClient kmsClient) {
     Preconditions.checkArgument(kmsClient != null, "Invalid KMS client: null");
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java 
b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
index d931b49610..c2b47e6e94 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
@@ -48,7 +48,6 @@ import org.apache.iceberg.rest.HTTPRequest;
 import org.apache.iceberg.rest.ImmutableHTTPRequest;
 import org.apache.iceberg.rest.RESTClient;
 import org.apache.iceberg.rest.RESTUtil;
-import org.apache.iceberg.rest.ResourcePaths;
 import org.apache.iceberg.rest.responses.OAuthTokenResponse;
 import org.apache.iceberg.util.JsonUtil;
 import org.apache.iceberg.util.Pair;
@@ -186,7 +185,12 @@ public class OAuth2Util {
       }
 
       return fetchToken(
-          client, Map.of(), config.credential(), config.scope(), 
config.oauth2ServerUri());
+          client,
+          Map.of(),
+          config.credential(),
+          config.scope(),
+          config.oauth2ServerUri(),
+          ImmutableMap.of());
     }
   }
 
@@ -221,59 +225,6 @@ public class OAuth2Util {
     return response;
   }
 
-  /**
-   * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
-   *     OAuth2Util#exchangeToken(RESTClient, Map, String, String, String, 
String, String, String,
-   *     Map)} instead.
-   */
-  @Deprecated
-  public static OAuthTokenResponse exchangeToken(
-      RESTClient client,
-      Map<String, String> headers,
-      String subjectToken,
-      String subjectTokenType,
-      String actorToken,
-      String actorTokenType,
-      String scope) {
-    return exchangeToken(
-        client,
-        headers,
-        subjectToken,
-        subjectTokenType,
-        actorToken,
-        actorTokenType,
-        scope,
-        ResourcePaths.tokens(),
-        ImmutableMap.of());
-  }
-
-  /**
-   * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
-   *     OAuth2Util#exchangeToken(RESTClient, Map, String, String, String, 
String, String, String,
-   *     Map)} instead.
-   */
-  @Deprecated
-  public static OAuthTokenResponse exchangeToken(
-      RESTClient client,
-      Map<String, String> headers,
-      String subjectToken,
-      String subjectTokenType,
-      String actorToken,
-      String actorTokenType,
-      String scope,
-      String oauth2ServerUri) {
-    return exchangeToken(
-        client,
-        headers,
-        subjectToken,
-        subjectTokenType,
-        actorToken,
-        actorTokenType,
-        scope,
-        oauth2ServerUri,
-        ImmutableMap.of());
-  }
-
   public static OAuthTokenResponse fetchToken(
       RESTClient client,
       Map<String, String> headers,
@@ -299,33 +250,6 @@ public class OAuth2Util {
     return response;
   }
 
-  /**
-   * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
-   *     OAuth2Util#fetchToken(RESTClient, Map, String, String, String, Map)} 
instead.
-   */
-  @Deprecated
-  public static OAuthTokenResponse fetchToken(
-      RESTClient client, Map<String, String> headers, String credential, 
String scope) {
-
-    return fetchToken(
-        client, headers, credential, scope, ResourcePaths.tokens(), 
ImmutableMap.of());
-  }
-
-  /**
-   * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
-   *     OAuth2Util#fetchToken(RESTClient, Map, String, String, String, Map)} 
instead.
-   */
-  @Deprecated
-  public static OAuthTokenResponse fetchToken(
-      RESTClient client,
-      Map<String, String> headers,
-      String credential,
-      String scope,
-      String oauth2ServerUri) {
-
-    return fetchToken(client, headers, credential, scope, oauth2ServerUri, 
ImmutableMap.of());
-  }
-
   private static Map<String, String> tokenExchangeRequest(
       String subjectToken,
       String subjectTokenType,
@@ -638,7 +562,8 @@ public class OAuth2Util {
         return refreshToken(
             client, config, basicHeaders, token(), tokenType(), 
optionalOAuthParams());
       } else {
-        return fetchToken(client, Map.of(), credential(), scope(), 
oauth2ServerUri());
+        return fetchToken(
+            client, Map.of(), credential(), scope(), oauth2ServerUri(), 
ImmutableMap.of());
       }
     }
 
diff --git 
a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java 
b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
index cf39e4611b..71fdc9507d 100644
--- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java
@@ -617,7 +617,7 @@ public abstract class PartitionStatsHandlerTestBase {
     Table testTable =
         TestTables.create(tempDir("old_schema"), "old_schema", SCHEMA, spec, 
2, fileFormatProperty);
     Types.StructType partitionType = Partitioning.partitionType(testTable);
-    Schema newSchema = PartitionStatsHandler.schema(partitionType);
+    Schema newSchema = PartitionStatsHandler.schema(partitionType, 2);
     Schema oldSchema = invalidOldSchema(partitionType);
 
     PartitionStatisticsFile invalidStatisticsFile =
@@ -667,7 +667,7 @@ public abstract class PartitionStatsHandlerTestBase {
     List<PartitionStats> partitionStats;
     try (CloseableIterable<PartitionStats> recordIterator =
         PartitionStatsHandler.readPartitionStatsFile(
-            PartitionStatsHandler.schema(partitionType),
+            PartitionStatsHandler.schema(partitionType, 2),
             testTable.io().newInputFile(statisticsFile.path()))) {
       partitionStats = Lists.newArrayList(recordIterator);
     }
diff --git a/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java 
b/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java
deleted file mode 100644
index 98e75f2626..0000000000
--- a/core/src/test/java/org/apache/iceberg/TestPartitionStatsUtil.java
+++ /dev/null
@@ -1,588 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg;
-
-import static org.apache.iceberg.types.Types.NestedField.optional;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.types.Types;
-import org.assertj.core.groups.Tuple;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-/**
- * @deprecated since 1.10.0, will be removed in 1.11.0; covered by 
`PartitionStatsHandlerTestBase`.
- */
-@Deprecated
-public class TestPartitionStatsUtil {
-  private static final Schema SCHEMA =
-      new Schema(
-          optional(1, "c1", Types.IntegerType.get()),
-          optional(2, "c2", Types.StringType.get()),
-          optional(3, "c3", Types.StringType.get()));
-
-  protected static final PartitionSpec SPEC =
-      PartitionSpec.builderFor(SCHEMA).identity("c2").identity("c3").build();
-
-  @TempDir public File temp;
-
-  @Test
-  public void testPartitionStatsOnEmptyTable() throws Exception {
-    Table testTable = TestTables.create(tempDir("empty_table"), "empty_table", 
SCHEMA, SPEC, 2);
-    assertThatThrownBy(
-            () -> PartitionStatsUtil.computeStats(testTable, 
testTable.currentSnapshot()))
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("snapshot cannot be null");
-  }
-
-  @Test
-  public void testPartitionStatsOnUnPartitionedTable() throws Exception {
-    Table testTable =
-        TestTables.create(
-            tempDir("unpartitioned_table"),
-            "unpartitioned_table",
-            SCHEMA,
-            PartitionSpec.unpartitioned(),
-            2);
-
-    List<DataFile> files = prepareDataFiles(testTable);
-    AppendFiles appendFiles = testTable.newAppend();
-    files.forEach(appendFiles::appendFile);
-    appendFiles.commit();
-
-    assertThatThrownBy(
-            () -> PartitionStatsUtil.computeStats(testTable, 
testTable.currentSnapshot()))
-        .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("table must be partitioned");
-  }
-
-  @Test
-  public void testPartitionStats() throws Exception {
-    Table testTable =
-        TestTables.create(
-            tempDir("partition_stats_compute"), "partition_stats_compute", 
SCHEMA, SPEC, 2);
-
-    List<DataFile> files = prepareDataFiles(testTable);
-    for (int i = 0; i < 3; i++) {
-      // insert same set of records thrice to have a new manifest files
-      AppendFiles appendFiles = testTable.newAppend();
-      files.forEach(appendFiles::appendFile);
-      appendFiles.commit();
-    }
-
-    Snapshot snapshot1 = testTable.currentSnapshot();
-    Types.StructType partitionType = Partitioning.partitionType(testTable);
-    computeAndValidatePartitionStats(
-        testTable,
-        Tuple.tuple(
-            partitionData(partitionType, "foo", "A"),
-            0,
-            3 * files.get(0).recordCount(),
-            3,
-            3 * files.get(0).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot1.timestampMillis(),
-            snapshot1.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "foo", "B"),
-            0,
-            3 * files.get(1).recordCount(),
-            3,
-            3 * files.get(1).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot1.timestampMillis(),
-            snapshot1.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "bar", "A"),
-            0,
-            3 * files.get(2).recordCount(),
-            3,
-            3 * files.get(2).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot1.timestampMillis(),
-            snapshot1.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "bar", "B"),
-            0,
-            3 * files.get(3).recordCount(),
-            3,
-            3 * files.get(3).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot1.timestampMillis(),
-            snapshot1.snapshotId()));
-
-    DeleteFile deleteFile =
-        FileGenerationUtil.generatePositionDeleteFile(testTable, 
TestHelpers.Row.of("foo", "A"));
-    testTable.newRowDelta().addDeletes(deleteFile).commit();
-    Snapshot snapshot2 = testTable.currentSnapshot();
-
-    DeleteFile eqDelete =
-        FileGenerationUtil.generateEqualityDeleteFile(testTable, 
TestHelpers.Row.of("bar", "B"));
-    testTable.newRowDelta().addDeletes(eqDelete).commit();
-    Snapshot snapshot3 = testTable.currentSnapshot();
-
-    computeAndValidatePartitionStats(
-        testTable,
-        Tuple.tuple(
-            partitionData(partitionType, "foo", "A"),
-            0,
-            3 * files.get(0).recordCount(),
-            3,
-            3 * files.get(0).fileSizeInBytes(),
-            deleteFile.recordCount(), // position delete file count
-            1, // one position delete file
-            0L,
-            0,
-            null,
-            snapshot2.timestampMillis(), // new snapshot from pos delete commit
-            snapshot2.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "foo", "B"),
-            0,
-            3 * files.get(1).recordCount(),
-            3,
-            3 * files.get(1).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot1.timestampMillis(),
-            snapshot1.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "bar", "A"),
-            0,
-            3 * files.get(2).recordCount(),
-            3,
-            3 * files.get(2).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot1.timestampMillis(),
-            snapshot1.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "bar", "B"),
-            0,
-            3 * files.get(3).recordCount(),
-            3,
-            3 * files.get(3).fileSizeInBytes(),
-            0L,
-            0,
-            eqDelete.recordCount(),
-            1, // one equality delete file
-            null,
-            snapshot3.timestampMillis(), // new snapshot from equality delete 
commit
-            snapshot3.snapshotId()));
-  }
-
-  @Test
-  @SuppressWarnings("MethodLength")
-  public void testPartitionStatsWithSchemaEvolution() throws Exception {
-    final PartitionSpec specBefore = 
PartitionSpec.builderFor(SCHEMA).identity("c2").build();
-
-    Table testTable =
-        TestTables.create(
-            tempDir("partition_stats_schema_evolve"),
-            "partition_stats_schema_evolve",
-            SCHEMA,
-            specBefore,
-            SortOrder.unsorted(),
-            2);
-
-    List<DataFile> dataFiles = prepareDataFilesOnePart(testTable);
-    for (int i = 0; i < 2; i++) {
-      AppendFiles appendFiles = testTable.newAppend();
-      dataFiles.forEach(appendFiles::appendFile);
-      appendFiles.commit();
-    }
-    Snapshot snapshot1 = testTable.currentSnapshot();
-    Types.StructType partitionType = Partitioning.partitionType(testTable);
-
-    computeAndValidatePartitionStats(
-        testTable,
-        Tuple.tuple(
-            partitionData(partitionType, "foo"),
-            0,
-            2 * dataFiles.get(0).recordCount(),
-            2,
-            2 * dataFiles.get(0).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot1.timestampMillis(),
-            snapshot1.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "bar"),
-            0,
-            2 * dataFiles.get(1).recordCount(),
-            2,
-            2 * dataFiles.get(1).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot1.timestampMillis(),
-            snapshot1.snapshotId()));
-
-    // Evolve the partition spec to include c3
-    testTable.updateSpec().addField("c3").commit();
-    List<DataFile> filesWithNewSpec = prepareDataFiles(testTable);
-    filesWithNewSpec.add(
-        FileGenerationUtil.generateDataFile(testTable, 
TestHelpers.Row.of("bar", null)));
-    partitionType = Partitioning.partitionType(testTable);
-
-    AppendFiles appendFiles = testTable.newAppend();
-    filesWithNewSpec.forEach(appendFiles::appendFile);
-    appendFiles.commit();
-    Snapshot snapshot2 = testTable.currentSnapshot();
-
-    computeAndValidatePartitionStats(
-        testTable,
-        Tuple.tuple(
-            partitionData(partitionType, "foo", null), // unified tuple
-            0, // old spec id as the record is unmodified
-            2 * dataFiles.get(0).recordCount(),
-            2,
-            2 * dataFiles.get(0).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot1.timestampMillis(),
-            snapshot1.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "bar", null),
-            0, // old spec id for "bar, null" before evolution
-            2 * dataFiles.get(1).recordCount(),
-            2,
-            2 * dataFiles.get(1).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot1.timestampMillis(),
-            snapshot1.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "bar", null),
-            1, // new spec id for "bar, null" after evolution
-            filesWithNewSpec.get(4).recordCount(),
-            1,
-            filesWithNewSpec.get(4).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot2.timestampMillis(), // new snapshot
-            snapshot2.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "foo", "A"),
-            1, // new spec id
-            filesWithNewSpec.get(0).recordCount(),
-            1,
-            filesWithNewSpec.get(0).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot2.timestampMillis(), // new snapshot
-            snapshot2.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "foo", "B"),
-            1,
-            filesWithNewSpec.get(1).recordCount(),
-            1,
-            filesWithNewSpec.get(1).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot2.timestampMillis(),
-            snapshot2.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "bar", "A"),
-            1,
-            filesWithNewSpec.get(2).recordCount(),
-            1,
-            filesWithNewSpec.get(2).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot2.timestampMillis(),
-            snapshot2.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "bar", "B"),
-            1,
-            filesWithNewSpec.get(3).recordCount(),
-            1,
-            filesWithNewSpec.get(3).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot2.timestampMillis(),
-            snapshot2.snapshotId()));
-  }
-
-  @Test
-  @SuppressWarnings("MethodLength")
-  public void testPartitionStatsWithBucketTransformSchemaEvolution() throws 
Exception {
-    PartitionSpec specBefore =
-        PartitionSpec.builderFor(SCHEMA).identity("c2").bucket("c1", 
2).build();
-
-    Table testTable =
-        TestTables.create(
-            tempDir("partition_stats_schema_evolve2"),
-            "partition_stats_schema_evolve2",
-            SCHEMA,
-            specBefore,
-            SortOrder.unsorted(),
-            2);
-
-    List<DataFile> dataFiles = Lists.newArrayList();
-    for (int i = 0; i < 2; i++) {
-      dataFiles.add(FileGenerationUtil.generateDataFile(testTable, 
TestHelpers.Row.of("foo", i)));
-    }
-
-    AppendFiles appendFiles = testTable.newAppend();
-    dataFiles.forEach(appendFiles::appendFile);
-    appendFiles.commit();
-
-    Snapshot snapshot1 = testTable.currentSnapshot();
-    Types.StructType partitionType = Partitioning.partitionType(testTable);
-
-    computeAndValidatePartitionStats(
-        testTable,
-        Tuple.tuple(
-            partitionData(partitionType, "foo", 0),
-            0,
-            dataFiles.get(0).recordCount(),
-            1,
-            dataFiles.get(0).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot1.timestampMillis(),
-            snapshot1.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "foo", 1),
-            0,
-            dataFiles.get(1).recordCount(),
-            1,
-            dataFiles.get(1).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot1.timestampMillis(),
-            snapshot1.snapshotId()));
-
-    // Evolve the partition spec
-    testTable
-        .updateSpec()
-        .removeField(Expressions.bucket("c1", 2))
-        .addField(Expressions.bucket("c1", 4))
-        .commit();
-
-    List<DataFile> filesWithNewSpec = Lists.newArrayList();
-    for (int i = 0; i < 4; i++) {
-      filesWithNewSpec.add(
-          FileGenerationUtil.generateDataFile(testTable, 
TestHelpers.Row.of("bar", i)));
-    }
-
-    appendFiles = testTable.newAppend();
-    filesWithNewSpec.forEach(appendFiles::appendFile);
-    appendFiles.commit();
-
-    Snapshot snapshot2 = testTable.currentSnapshot();
-    partitionType = Partitioning.partitionType(testTable);
-
-    computeAndValidatePartitionStats(
-        testTable,
-        Tuple.tuple(
-            partitionData(partitionType, "foo", 0, null),
-            0,
-            dataFiles.get(0).recordCount(),
-            1,
-            dataFiles.get(0).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot1.timestampMillis(),
-            snapshot1.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "foo", 1, null),
-            0,
-            dataFiles.get(1).recordCount(),
-            1,
-            dataFiles.get(1).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot1.timestampMillis(),
-            snapshot1.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "bar", null, 0),
-            1,
-            filesWithNewSpec.get(0).recordCount(),
-            1,
-            filesWithNewSpec.get(0).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot2.timestampMillis(),
-            snapshot2.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "bar", null, 1),
-            1,
-            filesWithNewSpec.get(1).recordCount(),
-            1,
-            filesWithNewSpec.get(1).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot2.timestampMillis(),
-            snapshot2.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "bar", null, 2),
-            1,
-            filesWithNewSpec.get(2).recordCount(),
-            1,
-            filesWithNewSpec.get(2).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot2.timestampMillis(),
-            snapshot2.snapshotId()),
-        Tuple.tuple(
-            partitionData(partitionType, "bar", null, 3),
-            1,
-            filesWithNewSpec.get(3).recordCount(),
-            1,
-            filesWithNewSpec.get(3).fileSizeInBytes(),
-            0L,
-            0,
-            0L,
-            0,
-            null,
-            snapshot2.timestampMillis(),
-            snapshot2.snapshotId()));
-  }
-
-  private static PartitionData partitionData(Types.StructType partitionType, 
Object... fields) {
-    PartitionData partitionData = new PartitionData(partitionType);
-    for (int i = 0; i < fields.length; i++) {
-      partitionData.set(i, fields[i]);
-    }
-
-    return partitionData;
-  }
-
-  private static List<DataFile> prepareDataFiles(Table table) {
-    List<DataFile> dataFiles = Lists.newArrayList();
-    dataFiles.add(FileGenerationUtil.generateDataFile(table, 
TestHelpers.Row.of("foo", "A")));
-    dataFiles.add(FileGenerationUtil.generateDataFile(table, 
TestHelpers.Row.of("foo", "B")));
-    dataFiles.add(FileGenerationUtil.generateDataFile(table, 
TestHelpers.Row.of("bar", "A")));
-    dataFiles.add(FileGenerationUtil.generateDataFile(table, 
TestHelpers.Row.of("bar", "B")));
-
-    return dataFiles;
-  }
-
-  private static List<DataFile> prepareDataFilesOnePart(Table table) {
-    List<DataFile> dataFiles = Lists.newArrayList();
-    dataFiles.add(FileGenerationUtil.generateDataFile(table, 
TestHelpers.Row.of("foo")));
-    dataFiles.add(FileGenerationUtil.generateDataFile(table, 
TestHelpers.Row.of("bar")));
-
-    return dataFiles;
-  }
-
-  private static void computeAndValidatePartitionStats(Table testTable, 
Tuple... expectedValues) {
-    // compute and commit partition stats file
-    Collection<PartitionStats> result =
-        PartitionStatsUtil.computeStats(testTable, 
testTable.currentSnapshot());
-
-    assertThat(result)
-        .extracting(
-            PartitionStats::partition,
-            PartitionStats::specId,
-            PartitionStats::dataRecordCount,
-            PartitionStats::dataFileCount,
-            PartitionStats::totalDataFileSizeInBytes,
-            PartitionStats::positionDeleteRecordCount,
-            PartitionStats::positionDeleteFileCount,
-            PartitionStats::equalityDeleteRecordCount,
-            PartitionStats::equalityDeleteFileCount,
-            PartitionStats::totalRecords,
-            PartitionStats::lastUpdatedAt,
-            PartitionStats::lastUpdatedSnapshotId)
-        .containsExactlyInAnyOrder(expectedValues);
-  }
-
-  private File tempDir(String folderName) throws IOException {
-    return java.nio.file.Files.createTempDirectory(temp.toPath(), 
folderName).toFile();
-  }
-}
diff --git 
a/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java 
b/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java
index 8a688bebf5..851e423fb6 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteTablePathUtil.java
@@ -50,9 +50,10 @@ public class TestRewriteTablePathUtil {
   public void testStagingPathBackwardCompatibility() {
     // Test that the deprecated method still works
     String originalPath = "/some/path/file.parquet";
+    String sourcePrefix = "/some/path";
     String stagingDir = "/staging/";
 
-    String result = RewriteTablePathUtil.stagingPath(originalPath, stagingDir);
+    String result = RewriteTablePathUtil.stagingPath(originalPath, 
sourcePrefix, stagingDir);
 
     assertThat(result).isEqualTo("/staging/file.parquet");
   }
@@ -75,16 +76,8 @@ public class TestRewriteTablePathUtil {
     String stagingDir = "/staging/";
     String fileDirectlyUnderPrefix = "/source/table/file.parquet";
 
-    // Test new method
     String newMethodResult =
         RewriteTablePathUtil.stagingPath(fileDirectlyUnderPrefix, 
sourcePrefix, stagingDir);
-
-    // Test old deprecated method
-    String oldMethodResult = 
RewriteTablePathUtil.stagingPath(fileDirectlyUnderPrefix, stagingDir);
-
-    // Both methods should behave the same when there's no middle part
     assertThat(newMethodResult).isEqualTo("/staging/file.parquet");
-    assertThat(oldMethodResult).isEqualTo("/staging/file.parquet");
-    assertThat(newMethodResult).isEqualTo(oldMethodResult);
   }
 }
diff --git 
a/data/src/jmh/java/org/apache/iceberg/PartitionStatsHandlerBenchmark.java 
b/data/src/jmh/java/org/apache/iceberg/PartitionStatsHandlerBenchmark.java
index ec0bfa9306..938dc28637 100644
--- a/data/src/jmh/java/org/apache/iceberg/PartitionStatsHandlerBenchmark.java
+++ b/data/src/jmh/java/org/apache/iceberg/PartitionStatsHandlerBenchmark.java
@@ -104,7 +104,7 @@ public class PartitionStatsHandlerBenchmark {
     List<PartitionStats> stats;
     try (CloseableIterable<PartitionStats> recordIterator =
         PartitionStatsHandler.readPartitionStatsFile(
-            PartitionStatsHandler.schema(Partitioning.partitionType(table)),
+            PartitionStatsHandler.schema(Partitioning.partitionType(table), 2),
             Files.localInput(statisticsFile.path()))) {
       stats = Lists.newArrayList(recordIterator);
     }
diff --git 
a/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java 
b/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java
deleted file mode 100644
index e4901f4e8c..0000000000
--- a/data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.data;
-
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Locale;
-import java.util.UUID;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.HasTableOperations;
-import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.PartitionStatisticsFile;
-import org.apache.iceberg.PartitionStats;
-import org.apache.iceberg.PartitionStatsUtil;
-import org.apache.iceberg.Partitioning;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.avro.InternalReader;
-import org.apache.iceberg.data.parquet.InternalWriter;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.DataWriter;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.parquet.Parquet;
-import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.types.Types.IntegerType;
-import org.apache.iceberg.types.Types.LongType;
-import org.apache.iceberg.types.Types.NestedField;
-import org.apache.iceberg.types.Types.StructType;
-
-/**
- * Computes, writes and reads the {@link PartitionStatisticsFile}. Uses 
generic readers and writers
- * to support writing and reading of the stats in table default format.
- *
- * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
- *     org.apache.iceberg.PartitionStatsHandler} from core module
- */
-@Deprecated
-public class PartitionStatsHandler {
-
-  private PartitionStatsHandler() {}
-
-  public static final int PARTITION_FIELD_ID = 0;
-  public static final String PARTITION_FIELD_NAME = "partition";
-  public static final NestedField SPEC_ID = NestedField.required(1, "spec_id", 
IntegerType.get());
-  public static final NestedField DATA_RECORD_COUNT =
-      NestedField.required(2, "data_record_count", LongType.get());
-  public static final NestedField DATA_FILE_COUNT =
-      NestedField.required(3, "data_file_count", IntegerType.get());
-  public static final NestedField TOTAL_DATA_FILE_SIZE_IN_BYTES =
-      NestedField.required(4, "total_data_file_size_in_bytes", LongType.get());
-  public static final NestedField POSITION_DELETE_RECORD_COUNT =
-      NestedField.optional(5, "position_delete_record_count", LongType.get());
-  public static final NestedField POSITION_DELETE_FILE_COUNT =
-      NestedField.optional(6, "position_delete_file_count", IntegerType.get());
-  public static final NestedField EQUALITY_DELETE_RECORD_COUNT =
-      NestedField.optional(7, "equality_delete_record_count", LongType.get());
-  public static final NestedField EQUALITY_DELETE_FILE_COUNT =
-      NestedField.optional(8, "equality_delete_file_count", IntegerType.get());
-  public static final NestedField TOTAL_RECORD_COUNT =
-      NestedField.optional(9, "total_record_count", LongType.get());
-  public static final NestedField LAST_UPDATED_AT =
-      NestedField.optional(10, "last_updated_at", LongType.get());
-  public static final NestedField LAST_UPDATED_SNAPSHOT_ID =
-      NestedField.optional(11, "last_updated_snapshot_id", LongType.get());
-
-  /**
-   * Generates the partition stats file schema based on a combined partition 
type which considers
-   * all specs in a table.
-   *
-   * @param unifiedPartitionType unified partition schema type. Could be 
calculated by {@link
-   *     Partitioning#partitionType(Table)}.
-   * @return a schema that corresponds to the provided unified partition type.
-   */
-  public static Schema schema(StructType unifiedPartitionType) {
-    Preconditions.checkState(!unifiedPartitionType.fields().isEmpty(), "Table 
must be partitioned");
-    return new Schema(
-        NestedField.required(PARTITION_FIELD_ID, PARTITION_FIELD_NAME, 
unifiedPartitionType),
-        SPEC_ID,
-        DATA_RECORD_COUNT,
-        DATA_FILE_COUNT,
-        TOTAL_DATA_FILE_SIZE_IN_BYTES,
-        POSITION_DELETE_RECORD_COUNT,
-        POSITION_DELETE_FILE_COUNT,
-        EQUALITY_DELETE_RECORD_COUNT,
-        EQUALITY_DELETE_FILE_COUNT,
-        TOTAL_RECORD_COUNT,
-        LAST_UPDATED_AT,
-        LAST_UPDATED_SNAPSHOT_ID);
-  }
-
-  /**
-   * Computes and writes the {@link PartitionStatisticsFile} for a given 
table's current snapshot.
-   *
-   * @param table The {@link Table} for which the partition statistics is 
computed.
-   * @return {@link PartitionStatisticsFile} for the current snapshot, or null 
if no statistics are
-   *     present.
-   */
-  public static PartitionStatisticsFile computeAndWriteStatsFile(Table table) 
throws IOException {
-    if (table.currentSnapshot() == null) {
-      return null;
-    }
-
-    return computeAndWriteStatsFile(table, 
table.currentSnapshot().snapshotId());
-  }
-
-  /**
-   * Computes and writes the {@link PartitionStatisticsFile} for a given table 
and snapshot.
-   *
-   * @param table The {@link Table} for which the partition statistics is 
computed.
-   * @param snapshotId snapshot for which partition statistics are computed.
-   * @return {@link PartitionStatisticsFile} for the given snapshot, or null 
if no statistics are
-   *     present.
-   */
-  public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, 
long snapshotId)
-      throws IOException {
-    Snapshot snapshot = table.snapshot(snapshotId);
-    Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", 
snapshotId);
-
-    Collection<PartitionStats> stats = PartitionStatsUtil.computeStats(table, 
snapshot);
-    if (stats.isEmpty()) {
-      return null;
-    }
-
-    StructType partitionType = Partitioning.partitionType(table);
-    List<PartitionStats> sortedStats = PartitionStatsUtil.sortStats(stats, 
partitionType);
-    return writePartitionStatsFile(
-        table, snapshot.snapshotId(), schema(partitionType), sortedStats);
-  }
-
-  @VisibleForTesting
-  static PartitionStatisticsFile writePartitionStatsFile(
-      Table table, long snapshotId, Schema dataSchema, 
Iterable<PartitionStats> records)
-      throws IOException {
-    FileFormat fileFormat =
-        FileFormat.fromString(
-            table.properties().getOrDefault(DEFAULT_FILE_FORMAT, 
DEFAULT_FILE_FORMAT_DEFAULT));
-
-    OutputFile outputFile = newPartitionStatsFile(table, fileFormat, 
snapshotId);
-
-    try (DataWriter<StructLike> writer = dataWriter(dataSchema, outputFile, 
fileFormat)) {
-      records.iterator().forEachRemaining(writer::write);
-    }
-
-    return ImmutableGenericPartitionStatisticsFile.builder()
-        .snapshotId(snapshotId)
-        .path(outputFile.location())
-        .fileSizeInBytes(outputFile.toInputFile().getLength())
-        .build();
-  }
-
-  /**
-   * Reads partition statistics from the specified {@link InputFile} using 
given schema.
-   *
-   * @param schema The {@link Schema} of the partition statistics file.
-   * @param inputFile An {@link InputFile} pointing to the partition stats 
file.
-   */
-  public static CloseableIterable<PartitionStats> readPartitionStatsFile(
-      Schema schema, InputFile inputFile) {
-    CloseableIterable<StructLike> records = dataReader(schema, inputFile);
-    return CloseableIterable.transform(records, 
PartitionStatsHandler::recordToPartitionStats);
-  }
-
-  private static OutputFile newPartitionStatsFile(
-      Table table, FileFormat fileFormat, long snapshotId) {
-    Preconditions.checkArgument(
-        table instanceof HasTableOperations,
-        "Table must have operations to retrieve metadata location");
-
-    return table
-        .io()
-        .newOutputFile(
-            ((HasTableOperations) table)
-                .operations()
-                .metadataFileLocation(
-                    fileFormat.addExtension(
-                        String.format(
-                            Locale.ROOT, "partition-stats-%d-%s", snapshotId, 
UUID.randomUUID()))));
-  }
-
-  private static DataWriter<StructLike> dataWriter(
-      Schema dataSchema, OutputFile outputFile, FileFormat fileFormat) throws 
IOException {
-    switch (fileFormat) {
-      case PARQUET:
-        return Parquet.writeData(outputFile)
-            .schema(dataSchema)
-            .createWriterFunc(InternalWriter::createWriter)
-            .withSpec(PartitionSpec.unpartitioned())
-            .build();
-      case AVRO:
-        return Avro.writeData(outputFile)
-            .schema(dataSchema)
-            .createWriterFunc(org.apache.iceberg.avro.InternalWriter::create)
-            .withSpec(PartitionSpec.unpartitioned())
-            .build();
-      case ORC:
-        // Internal writers are not supported for ORC yet.
-      default:
-        throw new UnsupportedOperationException("Unsupported file format:" + 
fileFormat.name());
-    }
-  }
-
-  private static CloseableIterable<StructLike> dataReader(Schema schema, 
InputFile inputFile) {
-    FileFormat fileFormat = FileFormat.fromFileName(inputFile.location());
-    Preconditions.checkArgument(
-        fileFormat != null, "Unable to determine format of file: %s", 
inputFile.location());
-
-    switch (fileFormat) {
-      case PARQUET:
-        return Parquet.read(inputFile)
-            .project(schema)
-            .createReaderFunc(
-                fileSchema ->
-                    
org.apache.iceberg.data.parquet.InternalReader.create(schema, fileSchema))
-            .build();
-      case AVRO:
-        return Avro.read(inputFile)
-            .project(schema)
-            .createReaderFunc(fileSchema -> InternalReader.create(schema))
-            .build();
-      case ORC:
-        // Internal readers are not supported for ORC yet.
-      default:
-        throw new UnsupportedOperationException("Unsupported file format:" + 
fileFormat.name());
-    }
-  }
-
-  private static PartitionStats recordToPartitionStats(StructLike record) {
-    PartitionStats stats =
-        new PartitionStats(
-            record.get(PARTITION_FIELD_ID, StructLike.class),
-            record.get(SPEC_ID.fieldId(), Integer.class));
-    stats.set(DATA_RECORD_COUNT.fieldId(), 
record.get(DATA_RECORD_COUNT.fieldId(), Long.class));
-    stats.set(DATA_FILE_COUNT.fieldId(), record.get(DATA_FILE_COUNT.fieldId(), 
Integer.class));
-    stats.set(
-        TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(),
-        record.get(TOTAL_DATA_FILE_SIZE_IN_BYTES.fieldId(), Long.class));
-    stats.set(
-        POSITION_DELETE_RECORD_COUNT.fieldId(),
-        record.get(POSITION_DELETE_RECORD_COUNT.fieldId(), Long.class));
-    stats.set(
-        POSITION_DELETE_FILE_COUNT.fieldId(),
-        record.get(POSITION_DELETE_FILE_COUNT.fieldId(), Integer.class));
-    stats.set(
-        EQUALITY_DELETE_RECORD_COUNT.fieldId(),
-        record.get(EQUALITY_DELETE_RECORD_COUNT.fieldId(), Long.class));
-    stats.set(
-        EQUALITY_DELETE_FILE_COUNT.fieldId(),
-        record.get(EQUALITY_DELETE_FILE_COUNT.fieldId(), Integer.class));
-    stats.set(TOTAL_RECORD_COUNT.fieldId(), 
record.get(TOTAL_RECORD_COUNT.fieldId(), Long.class));
-    stats.set(LAST_UPDATED_AT.fieldId(), record.get(LAST_UPDATED_AT.fieldId(), 
Long.class));
-    stats.set(
-        LAST_UPDATED_SNAPSHOT_ID.fieldId(),
-        record.get(LAST_UPDATED_SNAPSHOT_ID.fieldId(), Long.class));
-    return stats;
-  }
-}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
index c77e17ad08..188eddbfbb 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
@@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.iceberg.common.DynConstructors;
-import org.apache.iceberg.gcp.GCPProperties;
 import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.DelegateFileIO;
 import org.apache.iceberg.io.FileInfo;
@@ -91,23 +90,6 @@ public class GCSFileIO implements DelegateFileIO, 
SupportsStorageCredentials {
     this.properties = SerializableMap.copyOf(Maps.newHashMap());
   }
 
-  /**
-   * Constructor with custom storage supplier and GCP properties.
-   *
-   * <p>Calling {@link GCSFileIO#initialize(Map)} will overwrite information 
set in this
-   * constructor.
-   *
-   * @param storageSupplier storage supplier
-   * @param gcpProperties gcp properties
-   * @deprecated since 1.10.0, will be removed in 1.11.0; use {@link
-   *     GCSFileIO#GCSFileIO(SerializableSupplier)} with {@link 
GCSFileIO#initialize(Map)} instead
-   */
-  @Deprecated
-  public GCSFileIO(SerializableSupplier<Storage> storageSupplier, 
GCPProperties gcpProperties) {
-    this.storageSupplier = storageSupplier;
-    this.properties = SerializableMap.copyOf(gcpProperties.properties());
-  }
-
   @Override
   public InputFile newInputFile(String path) {
     return GCSInputFile.fromLocation(path, clientForStoragePath(path), 
metrics);
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java
index 3a1b71b380..303411eb7d 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java
@@ -120,7 +120,7 @@ public class TestComputePartitionStatsAction extends 
CatalogTestBase {
     
assertThat(table.partitionStatisticsFiles()).containsExactly(statisticsFile);
 
     Types.StructType partitionType = Partitioning.partitionType(table);
-    Schema dataSchema = PartitionStatsHandler.schema(partitionType);
+    Schema dataSchema = PartitionStatsHandler.schema(partitionType, 2);
     validatePartitionStats(
         statisticsFile,
         dataSchema,
@@ -209,7 +209,7 @@ public class TestComputePartitionStatsAction extends 
CatalogTestBase {
     
assertThat(table.partitionStatisticsFiles()).containsExactly(statisticsFile);
 
     Types.StructType partitionType = Partitioning.partitionType(table);
-    Schema dataSchema = PartitionStatsHandler.schema(partitionType);
+    Schema dataSchema = PartitionStatsHandler.schema(partitionType, 2);
     // should contain stats for only partitions of snapshot1 (no entry for 
partition bar, A)
     validatePartitionStats(
         statisticsFile,
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 2d762a500f..ae88e04e44 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -158,7 +158,7 @@ public class TestRewriteDataFilesProcedure extends 
ExtensionsTestBase {
     PartitionStatisticsFile statisticsFile = 
PartitionStatsHandler.computeAndWriteStatsFile(table);
     
table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
 
-    Schema dataSchema = 
PartitionStatsHandler.schema(Partitioning.partitionType(table));
+    Schema dataSchema = 
PartitionStatsHandler.schema(Partitioning.partitionType(table), 2);
     List<PartitionStats> statsBeforeCompaction;
     try (CloseableIterable<PartitionStats> recordIterator =
         PartitionStatsHandler.readPartitionStatsFile(
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index a940186b97..1e41b3a042 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -411,7 +411,7 @@ public class TestRewriteManifestsProcedure extends 
ExtensionsTestBase {
     PartitionStatisticsFile statisticsFile = 
PartitionStatsHandler.computeAndWriteStatsFile(table);
     
table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
 
-    Schema dataSchema = 
PartitionStatsHandler.schema(Partitioning.partitionType(table));
+    Schema dataSchema = 
PartitionStatsHandler.schema(Partitioning.partitionType(table), 2);
     List<PartitionStats> statsBeforeRewrite;
     try (CloseableIterable<PartitionStats> recordIterator =
         PartitionStatsHandler.readPartitionStatsFile(
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java
index 3a1b71b380..303411eb7d 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java
@@ -120,7 +120,7 @@ public class TestComputePartitionStatsAction extends 
CatalogTestBase {
     
assertThat(table.partitionStatisticsFiles()).containsExactly(statisticsFile);
 
     Types.StructType partitionType = Partitioning.partitionType(table);
-    Schema dataSchema = PartitionStatsHandler.schema(partitionType);
+    Schema dataSchema = PartitionStatsHandler.schema(partitionType, 2);
     validatePartitionStats(
         statisticsFile,
         dataSchema,
@@ -209,7 +209,7 @@ public class TestComputePartitionStatsAction extends 
CatalogTestBase {
     
assertThat(table.partitionStatisticsFiles()).containsExactly(statisticsFile);
 
     Types.StructType partitionType = Partitioning.partitionType(table);
-    Schema dataSchema = PartitionStatsHandler.schema(partitionType);
+    Schema dataSchema = PartitionStatsHandler.schema(partitionType, 2);
     // should contain stats for only partitions of snapshot1 (no entry for 
partition bar, A)
     validatePartitionStats(
         statisticsFile,
diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 1340e205dd..3aabd635bb 100644
--- 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -157,7 +157,7 @@ public class TestRewriteDataFilesProcedure extends 
ExtensionsTestBase {
     PartitionStatisticsFile statisticsFile = 
PartitionStatsHandler.computeAndWriteStatsFile(table);
     
table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
 
-    Schema dataSchema = 
PartitionStatsHandler.schema(Partitioning.partitionType(table));
+    Schema dataSchema = 
PartitionStatsHandler.schema(Partitioning.partitionType(table), 2);
     List<PartitionStats> statsBeforeCompaction;
     try (CloseableIterable<PartitionStats> recordIterator =
         PartitionStatsHandler.readPartitionStatsFile(
diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index 1dd85f814f..a6896715ca 100644
--- 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -411,7 +411,7 @@ public class TestRewriteManifestsProcedure extends 
ExtensionsTestBase {
     PartitionStatisticsFile statisticsFile = 
PartitionStatsHandler.computeAndWriteStatsFile(table);
     
table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit();
 
-    Schema dataSchema = 
PartitionStatsHandler.schema(Partitioning.partitionType(table));
+    Schema dataSchema = 
PartitionStatsHandler.schema(Partitioning.partitionType(table), 2);
     List<PartitionStats> statsBeforeRewrite;
     try (CloseableIterable<PartitionStats> recordIterator =
         PartitionStatsHandler.readPartitionStatsFile(
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java
index 3a1b71b380..303411eb7d 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java
@@ -120,7 +120,7 @@ public class TestComputePartitionStatsAction extends 
CatalogTestBase {
     
assertThat(table.partitionStatisticsFiles()).containsExactly(statisticsFile);
 
     Types.StructType partitionType = Partitioning.partitionType(table);
-    Schema dataSchema = PartitionStatsHandler.schema(partitionType);
+    Schema dataSchema = PartitionStatsHandler.schema(partitionType, 2);
     validatePartitionStats(
         statisticsFile,
         dataSchema,
@@ -209,7 +209,7 @@ public class TestComputePartitionStatsAction extends 
CatalogTestBase {
     
assertThat(table.partitionStatisticsFiles()).containsExactly(statisticsFile);
 
     Types.StructType partitionType = Partitioning.partitionType(table);
-    Schema dataSchema = PartitionStatsHandler.schema(partitionType);
+    Schema dataSchema = PartitionStatsHandler.schema(partitionType, 2);
     // should contain stats for only partitions of snapshot1 (no entry for 
partition bar, A)
     validatePartitionStats(
         statisticsFile,

Reply via email to