This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f8072ba9b7 Spark 3.3: Remove use of deprecated SparkFilesScan (#7106)
f8072ba9b7 is described below
commit f8072ba9b7cec120237232fc31221dedd08faf54
Author: Szehon Ho <[email protected]>
AuthorDate: Tue Mar 21 13:17:46 2023 -0700
Spark 3.3: Remove use of deprecated SparkFilesScan (#7106)
---
.../org/apache/iceberg/spark/SparkReadConf.java | 6 --
.../org/apache/iceberg/spark/SparkReadOptions.java | 7 --
.../spark/actions/SparkBinPackStrategy.java | 6 +-
.../iceberg/spark/actions/SparkSortStrategy.java | 8 +-
.../iceberg/spark/actions/SparkZOrderStrategy.java | 2 +-
.../iceberg/spark/source/SparkFilesScan.java | 106 ---------------------
.../spark/source/SparkFilesScanBuilder.java | 46 ---------
.../apache/iceberg/spark/source/SparkTable.java | 5 -
.../iceberg/spark/TestFileRewriteCoordinator.java | 18 ++--
.../spark/actions/TestRewriteDataFilesAction.java | 6 +-
10 files changed, 20 insertions(+), 190 deletions(-)
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 1ebb58866f..1c1182c4da 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -120,12 +120,6 @@ public class SparkReadConf {
return
confParser.stringConf().option(SparkReadOptions.TAG).parseOptional();
}
- /** @deprecated will be removed in 1.3.0, use {@link #scanTaskSetId()}
instead */
- @Deprecated
- public String fileScanTaskSetId() {
- return
confParser.stringConf().option(SparkReadOptions.FILE_SCAN_TASK_SET_ID).parseOptional();
- }
-
public String scanTaskSetId() {
return
confParser.stringConf().option(SparkReadOptions.SCAN_TASK_SET_ID).parseOptional();
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index cf51017baa..9063e0f9ab 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -62,13 +62,6 @@ public class SparkReadOptions {
// Overrides the table's read.parquet.vectorization.batch-size
public static final String VECTORIZATION_BATCH_SIZE = "batch-size";
- /**
- * Set ID that is used to fetch file scan tasks
- *
- * @deprecated will be removed in 1.3.0, use SCAN_TASK_SET_ID instead
- */
- @Deprecated public static final String FILE_SCAN_TASK_SET_ID =
"file-scan-task-set-id";
-
// Set ID that is used to fetch scan tasks
public static final String SCAN_TASK_SET_ID = "scan-task-set-id";
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
index 483b06de4e..46aefd20af 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
@@ -27,7 +27,7 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.BinPackStrategy;
import org.apache.iceberg.spark.FileRewriteCoordinator;
-import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.SparkWriteOptions;
@@ -39,7 +39,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
private final Table table;
private final SparkSession spark;
private final SparkTableCache tableCache = SparkTableCache.get();
- private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+ private final ScanTaskSetManager manager = ScanTaskSetManager.get();
private final FileRewriteCoordinator rewriteCoordinator =
FileRewriteCoordinator.get();
public SparkBinPackStrategy(Table table, SparkSession spark) {
@@ -63,7 +63,7 @@ public class SparkBinPackStrategy extends BinPackStrategy {
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+ .option(SparkReadOptions.SCAN_TASK_SET_ID, groupID)
.option(SparkReadOptions.SPLIT_SIZE,
splitSize(inputFileSize(filesToRewrite)))
.option(SparkReadOptions.FILE_OPEN_COST, "0")
.load(groupID);
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
index d79360034f..59aafc595a 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
@@ -30,7 +30,7 @@ import org.apache.iceberg.actions.SortStrategy;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.spark.FileRewriteCoordinator;
-import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkTableCache;
@@ -63,7 +63,7 @@ public class SparkSortStrategy extends SortStrategy {
private final Table table;
private final SparkSession spark;
private final SparkTableCache tableCache = SparkTableCache.get();
- private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+ private final ScanTaskSetManager manager = ScanTaskSetManager.get();
private final FileRewriteCoordinator rewriteCoordinator =
FileRewriteCoordinator.get();
private double sizeEstimateMultiple;
@@ -128,7 +128,7 @@ public class SparkSortStrategy extends SortStrategy {
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+ .option(SparkReadOptions.SCAN_TASK_SET_ID, groupID)
.load(groupID);
// write the packed data into new files where each split becomes a new
file
@@ -171,7 +171,7 @@ public class SparkSortStrategy extends SortStrategy {
return tableCache;
}
- protected FileScanTaskSetManager manager() {
+ protected ScanTaskSetManager manager() {
return manager;
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
index b936fb9217..fe04c1f4f1 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
@@ -214,7 +214,7 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
+ .option(SparkReadOptions.SCAN_TASK_SET_ID, groupID)
.load(groupID);
Column[] originalColumns =
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java
deleted file mode 100644
index e42562843e..0000000000
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.spark.source;
-
-import java.util.List;
-import java.util.Objects;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.FileScanTaskSetManager;
-import org.apache.iceberg.spark.SparkReadConf;
-import org.apache.iceberg.util.TableScanUtil;
-import org.apache.spark.sql.SparkSession;
-
-/** @deprecated will be removed in 1.3.0, use {@link SparkStagedScan} instead
*/
-@Deprecated
-class SparkFilesScan extends SparkScan {
- private final String taskSetID;
- private final long splitSize;
- private final int splitLookback;
- private final long splitOpenFileCost;
-
- private List<CombinedScanTask> tasks = null; // lazy cache of tasks
-
- SparkFilesScan(SparkSession spark, Table table, SparkReadConf readConf) {
- super(spark, table, readConf, table.schema(), ImmutableList.of());
-
- this.taskSetID = readConf.fileScanTaskSetId();
- this.splitSize = readConf.splitSize();
- this.splitLookback = readConf.splitLookback();
- this.splitOpenFileCost = readConf.splitOpenFileCost();
- }
-
- @Override
- protected List<CombinedScanTask> taskGroups() {
- if (tasks == null) {
- FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
- List<FileScanTask> files = taskSetManager.fetchTasks(table(), taskSetID);
- ValidationException.check(
- files != null,
- "Task set manager has no tasks for table %s with id %s",
- table(),
- taskSetID);
-
- CloseableIterable<FileScanTask> splitFiles =
- TableScanUtil.splitFiles(CloseableIterable.withNoopClose(files),
splitSize);
- CloseableIterable<CombinedScanTask> scanTasks =
- TableScanUtil.planTasks(
- splitFiles, splitSize,
- splitLookback, splitOpenFileCost);
- this.tasks = Lists.newArrayList(scanTasks);
- }
-
- return tasks;
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
-
- if (other == null || getClass() != other.getClass()) {
- return false;
- }
-
- SparkFilesScan that = (SparkFilesScan) other;
- return table().name().equals(that.table().name())
- && Objects.equals(taskSetID, that.taskSetID)
- && Objects.equals(splitSize, that.splitSize)
- && Objects.equals(splitLookback, that.splitLookback)
- && Objects.equals(splitOpenFileCost, that.splitOpenFileCost);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(table().name(), taskSetID, splitSize, splitSize,
splitOpenFileCost);
- }
-
- @Override
- public String toString() {
- return String.format(
- "IcebergFilesScan(table=%s, type=%s, taskSetID=%s, caseSensitive=%s)",
- table(), expectedSchema().asStruct(), taskSetID, caseSensitive());
- }
-}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java
deleted file mode 100644
index d8f4a9e13a..0000000000
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.spark.source;
-
-import org.apache.iceberg.Table;
-import org.apache.iceberg.spark.SparkReadConf;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.read.Scan;
-import org.apache.spark.sql.connector.read.ScanBuilder;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-/** @deprecated will be removed in 1.3.0, use {@link SparkStagedScanBuilder}
instead */
-@Deprecated
-class SparkFilesScanBuilder implements ScanBuilder {
-
- private final SparkSession spark;
- private final Table table;
- private final SparkReadConf readConf;
-
- SparkFilesScanBuilder(SparkSession spark, Table table,
CaseInsensitiveStringMap options) {
- this.spark = spark;
- this.table = table;
- this.readConf = new SparkReadConf(spark, table, options);
- }
-
- @Override
- public Scan build() {
- return new SparkFilesScan(spark, table, readConf);
- }
-}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index cd501ccb52..d807e01033 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -260,11 +260,6 @@ public class SparkTable
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
- if (options.containsKey(SparkReadOptions.FILE_SCAN_TASK_SET_ID)) {
- // skip planning the job and fetch already staged file scan tasks
- return new SparkFilesScanBuilder(sparkSession(), icebergTable, options);
- }
-
if (options.containsKey(SparkReadOptions.SCAN_TASK_SET_ID)) {
return new SparkStagedScanBuilder(sparkSession(), icebergTable, options);
}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
index 2e6886d32d..a0e231e863 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java
@@ -74,7 +74,7 @@ public class TestFileRewriteCoordinator extends
SparkCatalogTestBase {
try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
String fileSetID = UUID.randomUUID().toString();
- FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
+ ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
taskSetManager.stageTasks(table, fileSetID,
Lists.newArrayList(fileScanTasks));
// read and pack original 4 files into 2 splits
@@ -82,7 +82,7 @@ public class TestFileRewriteCoordinator extends
SparkCatalogTestBase {
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, fileSetID)
+ .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.SPLIT_SIZE, Long.toString(avgFileSize *
2))
.option(SparkReadOptions.FILE_OPEN_COST, "0")
.load(tableName);
@@ -97,7 +97,7 @@ public class TestFileRewriteCoordinator extends
SparkCatalogTestBase {
FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
Set<DataFile> rewrittenFiles =
taskSetManager.fetchTasks(table, fileSetID).stream()
- .map(FileScanTask::file)
+ .map(t -> t.asFileScanTask().file())
.collect(Collectors.toSet());
Set<DataFile> addedFiles = rewriteCoordinator.fetchNewDataFiles(table,
fileSetID);
table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
@@ -129,7 +129,7 @@ public class TestFileRewriteCoordinator extends
SparkCatalogTestBase {
try (CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().planFiles()) {
String fileSetID = UUID.randomUUID().toString();
- FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
+ ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
taskSetManager.stageTasks(table, fileSetID,
Lists.newArrayList(fileScanTasks));
// read original 4 files as 4 splits
@@ -137,7 +137,7 @@ public class TestFileRewriteCoordinator extends
SparkCatalogTestBase {
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, fileSetID)
+ .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.SPLIT_SIZE, "134217728")
.option(SparkReadOptions.FILE_OPEN_COST, "134217728")
.load(tableName);
@@ -167,7 +167,7 @@ public class TestFileRewriteCoordinator extends
SparkCatalogTestBase {
FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get();
Set<DataFile> rewrittenFiles =
taskSetManager.fetchTasks(table, fileSetID).stream()
- .map(FileScanTask::file)
+ .map(t -> t.asFileScanTask().file())
.collect(Collectors.toSet());
Set<DataFile> addedFiles = rewriteCoordinator.fetchNewDataFiles(table,
fileSetID);
table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
@@ -198,7 +198,7 @@ public class TestFileRewriteCoordinator extends
SparkCatalogTestBase {
String firstFileSetID = UUID.randomUUID().toString();
long firstFileSetSnapshotId = table.currentSnapshot().snapshotId();
- FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
+ ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
// stage first 2 files for compaction
@@ -227,7 +227,7 @@ public class TestFileRewriteCoordinator extends
SparkCatalogTestBase {
spark
.read()
.format("iceberg")
- .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, fileSetID)
+ .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.option(SparkReadOptions.SPLIT_SIZE, Long.MAX_VALUE)
.load(tableName);
@@ -243,7 +243,7 @@ public class TestFileRewriteCoordinator extends
SparkCatalogTestBase {
Set<DataFile> rewrittenFiles =
fileSetIDs.stream()
.flatMap(fileSetID -> taskSetManager.fetchTasks(table,
fileSetID).stream())
- .map(FileScanTask::file)
+ .map(t -> t.asFileScanTask().file())
.collect(Collectors.toSet());
Set<DataFile> addedFiles =
fileSetIDs.stream()
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 61a855b1dd..5567abb541 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -89,7 +89,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.spark.FileRewriteCoordinator;
-import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
import
org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction.RewriteExecutionContext;
@@ -123,7 +123,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
@Rule public TemporaryFolder temp = new TemporaryFolder();
private final FileRewriteCoordinator coordinator =
FileRewriteCoordinator.get();
- private final FileScanTaskSetManager manager = FileScanTaskSetManager.get();
+ private final ScanTaskSetManager manager = ScanTaskSetManager.get();
private String tableLocation = null;
@Before
@@ -1675,7 +1675,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
private Set<String> cacheContents(Table table) {
return ImmutableSet.<String>builder()
- .addAll(manager.fetchSetIDs(table))
+ .addAll(manager.fetchSetIds(table))
.addAll(coordinator.fetchSetIDs(table))
.build();
}