This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f8072ba9b7 Spark 3.3: Remove use of deprecated SparkFilesScan (#7106)
f8072ba9b7 is described below

commit f8072ba9b7cec120237232fc31221dedd08faf54
Author: Szehon Ho <[email protected]>
AuthorDate: Tue Mar 21 13:17:46 2023 -0700

    Spark 3.3: Remove use of deprecated SparkFilesScan (#7106)
---
 .../org/apache/iceberg/spark/SparkReadConf.java    |   6 --
 .../org/apache/iceberg/spark/SparkReadOptions.java |   7 --
 .../spark/actions/SparkBinPackStrategy.java        |   6 +-
 .../iceberg/spark/actions/SparkSortStrategy.java   |   8 +-
 .../iceberg/spark/actions/SparkZOrderStrategy.java |   2 +-
 .../iceberg/spark/source/SparkFilesScan.java       | 106 ---------------------
 .../spark/source/SparkFilesScanBuilder.java        |  46 ---------
 .../apache/iceberg/spark/source/SparkTable.java    |   5 -
 .../iceberg/spark/TestFileRewriteCoordinator.java  |  18 ++--
 .../spark/actions/TestRewriteDataFilesAction.java  |   6 +-
 10 files changed, 20 insertions(+), 190 deletions(-)

diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 1ebb58866f..1c1182c4da 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -120,12 +120,6 @@ public class SparkReadConf {
     return 
confParser.stringConf().option(SparkReadOptions.TAG).parseOptional();
   }
 
-  /** @deprecated will be removed in 1.3.0, use {@link #scanTaskSetId()} 
instead */
-  @Deprecated
-  public String fileScanTaskSetId() {
-    return 
confParser.stringConf().option(SparkReadOptions.FILE_SCAN_TASK_SET_ID).parseOptional();
-  }
-
   public String scanTaskSetId() {
     return 
confParser.stringConf().option(SparkReadOptions.SCAN_TASK_SET_ID).parseOptional();
   }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index cf51017baa..9063e0f9ab 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -62,13 +62,6 @@ public class SparkReadOptions {
   // Overrides the table's read.parquet.vectorization.batch-size
   public static final String VECTORIZATION_BATCH_SIZE = "batch-size";
 
-  /**
-   * Set ID that is used to fetch file scan tasks
-   *
-   * @deprecated will be removed in 1.3.0, use SCAN_TASK_SET_ID instead
-   */
-  @Deprecated public static final String FILE_SCAN_TASK_SET_ID = 
"file-scan-task-set-id";
-
   // Set ID that is used to fetch scan tasks
   public static final String SCAN_TASK_SET_ID = "scan-task-set-id";
 
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
index 483b06de4e..46aefd20af 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
@@ -27,7 +27,7 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.BinPackStrategy;
 import org.apache.iceberg.spark.FileRewriteCoordinator;
-import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.ScanTaskSetManager;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkTableCache;
 import org.apache.iceberg.spark.SparkWriteOptions;
@@ -39,7 +39,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
   private final Table table;
   private final SparkSession spark;
   private final SparkTableCache tableCache = SparkTableCache.get();
-  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final ScanTaskSetManager manager = ScanTaskSetManager.get();
   private final FileRewriteCoordinator rewriteCoordinator = 
FileRewriteCoordinator.get();
 
   public SparkBinPackStrategy(Table table, SparkSession spark) {
@@ -63,7 +63,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+              .option(SparkReadOptions.SCAN_TASK_SET_ID, groupID)
               .option(SparkReadOptions.SPLIT_SIZE, 
splitSize(inputFileSize(filesToRewrite)))
               .option(SparkReadOptions.FILE_OPEN_COST, "0")
               .load(groupID);
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
index d79360034f..59aafc595a 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
@@ -30,7 +30,7 @@ import org.apache.iceberg.actions.SortStrategy;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.spark.FileRewriteCoordinator;
-import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.ScanTaskSetManager;
 import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkTableCache;
@@ -63,7 +63,7 @@ public class SparkSortStrategy extends SortStrategy {
   private final Table table;
   private final SparkSession spark;
   private final SparkTableCache tableCache = SparkTableCache.get();
-  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final ScanTaskSetManager manager = ScanTaskSetManager.get();
   private final FileRewriteCoordinator rewriteCoordinator = 
FileRewriteCoordinator.get();
 
   private double sizeEstimateMultiple;
@@ -128,7 +128,7 @@ public class SparkSortStrategy extends SortStrategy {
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+              .option(SparkReadOptions.SCAN_TASK_SET_ID, groupID)
               .load(groupID);
 
       // write the packed data into new files where each split becomes a new 
file
@@ -171,7 +171,7 @@ public class SparkSortStrategy extends SortStrategy {
     return tableCache;
   }
 
-  protected FileScanTaskSetManager manager() {
+  protected ScanTaskSetManager manager() {
     return manager;
   }
 
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
index b936fb9217..fe04c1f4f1 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
@@ -214,7 +214,7 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+              .option(SparkReadOptions.SCAN_TASK_SET_ID, groupID)
               .load(groupID);
 
       Column[] originalColumns =
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java
deleted file mode 100644
index e42562843e..0000000000
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.spark.source;
-
-import java.util.List;
-import java.util.Objects;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.FileScanTaskSetManager;
-import org.apache.iceberg.spark.SparkReadConf;
-import org.apache.iceberg.util.TableScanUtil;
-import org.apache.spark.sql.SparkSession;
-
-/** @deprecated will be removed in 1.3.0, use {@link SparkStagedScan} instead 
*/
-@Deprecated
-class SparkFilesScan extends SparkScan {
-  private final String taskSetID;
-  private final long splitSize;
-  private final int splitLookback;
-  private final long splitOpenFileCost;
-
-  private List<CombinedScanTask> tasks = null; // lazy cache of tasks
-
-  SparkFilesScan(SparkSession spark, Table table, SparkReadConf readConf) {
-    super(spark, table, readConf, table.schema(), ImmutableList.of());
-
-    this.taskSetID = readConf.fileScanTaskSetId();
-    this.splitSize = readConf.splitSize();
-    this.splitLookback = readConf.splitLookback();
-    this.splitOpenFileCost = readConf.splitOpenFileCost();
-  }
-
-  @Override
-  protected List<CombinedScanTask> taskGroups() {
-    if (tasks == null) {
-      FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
-      List<FileScanTask> files = taskSetManager.fetchTasks(table(), taskSetID);
-      ValidationException.check(
-          files != null,
-          "Task set manager has no tasks for table %s with id %s",
-          table(),
-          taskSetID);
-
-      CloseableIterable<FileScanTask> splitFiles =
-          TableScanUtil.splitFiles(CloseableIterable.withNoopClose(files), 
splitSize);
-      CloseableIterable<CombinedScanTask> scanTasks =
-          TableScanUtil.planTasks(
-              splitFiles, splitSize,
-              splitLookback, splitOpenFileCost);
-      this.tasks = Lists.newArrayList(scanTasks);
-    }
-
-    return tasks;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (this == other) {
-      return true;
-    }
-
-    if (other == null || getClass() != other.getClass()) {
-      return false;
-    }
-
-    SparkFilesScan that = (SparkFilesScan) other;
-    return table().name().equals(that.table().name())
-        && Objects.equals(taskSetID, that.taskSetID)
-        && Objects.equals(splitSize, that.splitSize)
-        && Objects.equals(splitLookback, that.splitLookback)
-        && Objects.equals(splitOpenFileCost, that.splitOpenFileCost);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(table().name(), taskSetID, splitSize, splitSize, 
splitOpenFileCost);
-  }
-
-  @Override
-  public String toString() {
-    return String.format(
-        "IcebergFilesScan(table=%s, type=%s, taskSetID=%s, caseSensitive=%s)",
-        table(), expectedSchema().asStruct(), taskSetID, caseSensitive());
-  }
-}
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java
deleted file mode 100644
index d8f4a9e13a..0000000000
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.spark.source;
-
-import org.apache.iceberg.Table;
-import org.apache.iceberg.spark.SparkReadConf;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.read.Scan;
-import org.apache.spark.sql.connector.read.ScanBuilder;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-/** @deprecated will be removed in 1.3.0, use {@link SparkStagedScanBuilder} 
instead */
-@Deprecated
-class SparkFilesScanBuilder implements ScanBuilder {
-
-  private final SparkSession spark;
-  private final Table table;
-  private final SparkReadConf readConf;
-
-  SparkFilesScanBuilder(SparkSession spark, Table table, 
CaseInsensitiveStringMap options) {
-    this.spark = spark;
-    this.table = table;
-    this.readConf = new SparkReadConf(spark, table, options);
-  }
-
-  @Override
-  public Scan build() {
-    return new SparkFilesScan(spark, table, readConf);
-  }
-}
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index cd501ccb52..d807e01033 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -260,11 +260,6 @@ public class SparkTable
 
   @Override
   public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
-    if (options.containsKey(SparkReadOptions.FILE_SCAN_TASK_SET_ID)) {
-      // skip planning the job and fetch already staged file scan tasks
-      return new SparkFilesScanBuilder(sparkSession(), icebergTable, options);
-    }
-
     if (options.containsKey(SparkReadOptions.SCAN_TASK_SET_ID)) {
       return new SparkStagedScanBuilder(sparkSession(), icebergTable, options);
     }
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
index 2e6886d32d..a0e231e863 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
@@ -74,7 +74,7 @@ public class TestFileRewriteCoordinator extends 
SparkCatalogTestBase {
     try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
       String fileSetID = UUID.randomUUID().toString();
 
-      FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
+      ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
       taskSetManager.stageTasks(table, fileSetID, 
Lists.newArrayList(fileScanTasks));
 
       // read and pack original 4 files into 2 splits
@@ -82,7 +82,7 @@ public class TestFileRewriteCoordinator extends 
SparkCatalogTestBase {
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, fileSetID)
+              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
               .option(SparkReadOptions.SPLIT_SIZE, Long.toString(avgFileSize * 
2))
               .option(SparkReadOptions.FILE_OPEN_COST, "0")
               .load(tableName);
@@ -97,7 +97,7 @@ public class TestFileRewriteCoordinator extends 
SparkCatalogTestBase {
       FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
       Set<DataFile> rewrittenFiles =
           taskSetManager.fetchTasks(table, fileSetID).stream()
-              .map(FileScanTask::file)
+              .map(t -> t.asFileScanTask().file())
               .collect(Collectors.toSet());
       Set<DataFile> addedFiles = rewriteCoordinator.fetchNewDataFiles(table, 
fileSetID);
       table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
@@ -129,7 +129,7 @@ public class TestFileRewriteCoordinator extends 
SparkCatalogTestBase {
     try (CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().planFiles()) {
       String fileSetID = UUID.randomUUID().toString();
 
-      FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
+      ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
       taskSetManager.stageTasks(table, fileSetID, 
Lists.newArrayList(fileScanTasks));
 
       // read original 4 files as 4 splits
@@ -137,7 +137,7 @@ public class TestFileRewriteCoordinator extends 
SparkCatalogTestBase {
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, fileSetID)
+              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
               .option(SparkReadOptions.SPLIT_SIZE, "134217728")
               .option(SparkReadOptions.FILE_OPEN_COST, "134217728")
               .load(tableName);
@@ -167,7 +167,7 @@ public class TestFileRewriteCoordinator extends 
SparkCatalogTestBase {
       FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
       Set<DataFile> rewrittenFiles =
           taskSetManager.fetchTasks(table, fileSetID).stream()
-              .map(FileScanTask::file)
+              .map(t -> t.asFileScanTask().file())
               .collect(Collectors.toSet());
       Set<DataFile> addedFiles = rewriteCoordinator.fetchNewDataFiles(table, 
fileSetID);
       table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
@@ -198,7 +198,7 @@ public class TestFileRewriteCoordinator extends 
SparkCatalogTestBase {
     String firstFileSetID = UUID.randomUUID().toString();
     long firstFileSetSnapshotId = table.currentSnapshot().snapshotId();
 
-    FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
+    ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
 
     try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
       // stage first 2 files for compaction
@@ -227,7 +227,7 @@ public class TestFileRewriteCoordinator extends 
SparkCatalogTestBase {
           spark
               .read()
               .format("iceberg")
-              .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, fileSetID)
+              .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
               .option(SparkReadOptions.SPLIT_SIZE, Long.MAX_VALUE)
               .load(tableName);
 
@@ -243,7 +243,7 @@ public class TestFileRewriteCoordinator extends 
SparkCatalogTestBase {
     Set<DataFile> rewrittenFiles =
         fileSetIDs.stream()
             .flatMap(fileSetID -> taskSetManager.fetchTasks(table, 
fileSetID).stream())
-            .map(FileScanTask::file)
+            .map(t -> t.asFileScanTask().file())
             .collect(Collectors.toSet());
     Set<DataFile> addedFiles =
         fileSetIDs.stream()
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 61a855b1dd..5567abb541 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -89,7 +89,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.spark.FileRewriteCoordinator;
-import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.ScanTaskSetManager;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkTestBase;
 import 
org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction.RewriteExecutionContext;
@@ -123,7 +123,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
   @Rule public TemporaryFolder temp = new TemporaryFolder();
 
   private final FileRewriteCoordinator coordinator = 
FileRewriteCoordinator.get();
-  private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+  private final ScanTaskSetManager manager = ScanTaskSetManager.get();
   private String tableLocation = null;
 
   @Before
@@ -1675,7 +1675,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
   private Set<String> cacheContents(Table table) {
     return ImmutableSet.<String>builder()
-        .addAll(manager.fetchSetIDs(table))
+        .addAll(manager.fetchSetIds(table))
         .addAll(coordinator.fetchSetIDs(table))
         .build();
   }

Reply via email to