Copilot commented on code in PR #15368:
URL: https://github.com/apache/iceberg/pull/15368#discussion_r2867656783
##########
open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTServerCatalogAdapter.java:
##########
@@ -46,11 +53,30 @@ public <T extends RESTResponse> T handleRequest(
Consumer<Map<String, String>> responseHeaders) {
T restResponse = super.handleRequest(route, vars, httpRequest,
responseType, responseHeaders);
- if (restResponse instanceof LoadTableResponse) {
- if (PropertyUtil.propertyAsBoolean(
- catalogContext.configuration(), INCLUDE_CREDENTIALS, false)) {
- applyCredentials(
- catalogContext.configuration(), ((LoadTableResponse)
restResponse).config());
+ if (PropertyUtil.propertyAsBoolean(
+ catalogContext.configuration(), INCLUDE_CREDENTIALS, false)) {
+ if (restResponse instanceof LoadTableResponse response) {
+ applyCredentials(catalogContext.configuration(), response.config());
+ } else if (restResponse instanceof PlanTableScanResponse response
+ && PlanStatus.COMPLETED == response.planStatus()) {
+ return (T)
+ PlanTableScanResponse.builder()
+ .withPlanStatus(response.planStatus())
+ .withPlanId(response.planId())
+ .withFileScanTasks(response.fileScanTasks())
+ .withSpecsById(response.specsById())
+
.withCredentials(createStorageCredentials(catalogContext.configuration()))
+ .build();
Review Comment:
When rebuilding a completed PlanTableScanResponse to attach credentials, the
code drops fields from the original response (e.g., planTasks and deleteFiles).
That changes the semantics of the response and can break clients expecting
those tasks/files. Preserve all fields from the original response when creating
the new response (planTasks, fileScanTasks, deleteFiles, specsById, etc.) and
only add/override the credentials.
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/TableWithIO.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.HistoryEntry;
+import org.apache.iceberg.IncrementalAppendScan;
+import org.apache.iceberg.IncrementalChangelogScan;
+import org.apache.iceberg.ManageSnapshots;
+import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.PartitionStatisticsScan;
+import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.ReplaceSortOrder;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.RewriteManifests;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateLocation;
+import org.apache.iceberg.UpdatePartitionSpec;
+import org.apache.iceberg.UpdatePartitionStatistics;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.UpdateStatistics;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This is a wrapper around {@link Table} and {@link FileIO} where the {@link
FileIO} instance might
+ * have more specific storage credentials and should be used for a table scan
+ */
+public class TableWithIO implements Table, HasTableOperations {
+ private final Table table;
+ private final Supplier<FileIO> fileIOForScan;
+
+ public TableWithIO(Table table, Supplier<FileIO> fileIOForScan) {
+ Preconditions.checkArgument(null != table, "Invalid table: null");
+ Preconditions.checkArgument(null != fileIOForScan, "Invalid FileIO
supplier: null");
+ this.table = table;
+ this.fileIOForScan = fileIOForScan;
+ }
+
+ @Override
+ public FileIO io() {
+ return fileIOForScan.get();
+ }
+
+ @Override
+ public void refresh() {
+ table.refresh();
+ }
+
+ @Override
+ public String name() {
+ return table.name();
+ }
+
+ @Override
+ public TableScan newScan() {
+ return table.newScan();
Review Comment:
TableWithIO.newScan() delegates directly to the wrapped table, so scans
created from TableWithIO will use the wrapped table's FileIO rather than the
FileIO exposed by TableWithIO.io(). That makes the wrapper internally
inconsistent and can reintroduce the original credential propagation problem if
TableWithIO is used outside the current Spark path. Consider overriding
newScan/newBatchScan to ensure the resulting scan uses the intended FileIO, or
explicitly document/guard against using this wrapper to create new scans.
```suggestion
throw new UnsupportedOperationException(
"TableWithIO must not be used to create new scans directly because
it would use the "
+ "underlying table's FileIO instead of the FileIO supplied via
TableWithIO.io().");
```
##########
open-api/src/testFixtures/java/org/apache/iceberg/rest/RESTServerCatalogAdapter.java:
##########
@@ -46,11 +53,30 @@ public <T extends RESTResponse> T handleRequest(
Consumer<Map<String, String>> responseHeaders) {
T restResponse = super.handleRequest(route, vars, httpRequest,
responseType, responseHeaders);
- if (restResponse instanceof LoadTableResponse) {
- if (PropertyUtil.propertyAsBoolean(
- catalogContext.configuration(), INCLUDE_CREDENTIALS, false)) {
- applyCredentials(
- catalogContext.configuration(), ((LoadTableResponse)
restResponse).config());
+ if (PropertyUtil.propertyAsBoolean(
+ catalogContext.configuration(), INCLUDE_CREDENTIALS, false)) {
+ if (restResponse instanceof LoadTableResponse response) {
+ applyCredentials(catalogContext.configuration(), response.config());
+ } else if (restResponse instanceof PlanTableScanResponse response
+ && PlanStatus.COMPLETED == response.planStatus()) {
+ return (T)
+ PlanTableScanResponse.builder()
+ .withPlanStatus(response.planStatus())
+ .withPlanId(response.planId())
+ .withFileScanTasks(response.fileScanTasks())
+ .withSpecsById(response.specsById())
+
.withCredentials(createStorageCredentials(catalogContext.configuration()))
+ .build();
+ } else if (restResponse instanceof FetchPlanningResultResponse response
+ && PlanStatus.COMPLETED == response.planStatus()) {
+ return (T)
+ FetchPlanningResultResponse.builder()
+ .withPlanStatus(response.planStatus())
+ .withFileScanTasks(response.fileScanTasks())
+ .withPlanTasks(response.planTasks())
+ .withSpecsById(response.specsById())
Review Comment:
When rebuilding a completed FetchPlanningResultResponse to attach
credentials, the code currently does not carry over deleteFiles from the
original response. If delete files are present, clients will silently lose
required information. Preserve deleteFiles (and any other fields) from the
original response when constructing the new response, and only add/override the
credentials.
```suggestion
.withSpecsById(response.specsById())
.withDeleteFiles(response.deleteFiles())
```
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/TableWithIO.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.HistoryEntry;
+import org.apache.iceberg.IncrementalAppendScan;
+import org.apache.iceberg.IncrementalChangelogScan;
+import org.apache.iceberg.ManageSnapshots;
+import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.PartitionStatisticsScan;
+import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.ReplaceSortOrder;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.RewriteManifests;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateLocation;
+import org.apache.iceberg.UpdatePartitionSpec;
+import org.apache.iceberg.UpdatePartitionStatistics;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.UpdateStatistics;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * This is a wrapper around {@link Table} and {@link FileIO} where the {@link
FileIO} instance might
+ * have more specific storage credentials and should be used for a table scan
+ */
+public class TableWithIO implements Table, HasTableOperations {
+ private final Table table;
+ private final Supplier<FileIO> fileIOForScan;
+
+ public TableWithIO(Table table, Supplier<FileIO> fileIOForScan) {
+ Preconditions.checkArgument(null != table, "Invalid table: null");
+ Preconditions.checkArgument(null != fileIOForScan, "Invalid FileIO
supplier: null");
+ this.table = table;
+ this.fileIOForScan = fileIOForScan;
+ }
+
+ @Override
+ public FileIO io() {
+ return fileIOForScan.get();
+ }
+
+ @Override
+ public void refresh() {
+ table.refresh();
+ }
+
+ @Override
+ public String name() {
+ return table.name();
+ }
+
+ @Override
+ public TableScan newScan() {
+ return table.newScan();
+ }
+
+ @Override
+ public Schema schema() {
+ return table.schema();
+ }
+
+ @Override
+ public Map<Integer, Schema> schemas() {
+ return table.schemas();
+ }
+
+ @Override
+ public PartitionSpec spec() {
+ return table.spec();
+ }
+
+ @Override
+ public Map<Integer, PartitionSpec> specs() {
+ return table.specs();
+ }
+
+ @Override
+ public SortOrder sortOrder() {
+ return table.sortOrder();
+ }
+
+ @Override
+ public Map<Integer, SortOrder> sortOrders() {
+ return table.sortOrders();
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return table.properties();
+ }
+
+ @Override
+ public String location() {
+ return table.location();
+ }
+
+ @Override
+ public Snapshot currentSnapshot() {
+ return table.currentSnapshot();
+ }
+
+ @Override
+ public Snapshot snapshot(long snapshotId) {
+ return table.snapshot(snapshotId);
+ }
+
+ @Override
+ public Iterable<Snapshot> snapshots() {
+ return table.snapshots();
+ }
+
+ @Override
+ public List<HistoryEntry> history() {
+ return table.history();
+ }
+
+ @Override
+ public UpdateSchema updateSchema() {
+ return table.updateSchema();
+ }
+
+ @Override
+ public UpdatePartitionSpec updateSpec() {
+ return table.updateSpec();
+ }
+
+ @Override
+ public UpdateProperties updateProperties() {
+ return table.updateProperties();
+ }
+
+ @Override
+ public ReplaceSortOrder replaceSortOrder() {
+ return table.replaceSortOrder();
+ }
+
+ @Override
+ public UpdateLocation updateLocation() {
+ return table.updateLocation();
+ }
+
+ @Override
+ public AppendFiles newAppend() {
+ return table.newAppend();
+ }
+
+ @Override
+ public RewriteFiles newRewrite() {
+ return table.newRewrite();
+ }
+
+ @Override
+ public RewriteManifests rewriteManifests() {
+ return table.rewriteManifests();
+ }
+
+ @Override
+ public OverwriteFiles newOverwrite() {
+ return table.newOverwrite();
+ }
+
+ @Override
+ public RowDelta newRowDelta() {
+ return table.newRowDelta();
+ }
+
+ @Override
+ public ReplacePartitions newReplacePartitions() {
+ return table.newReplacePartitions();
+ }
+
+ @Override
+ public DeleteFiles newDelete() {
+ return table.newDelete();
+ }
+
+ @Override
+ public ExpireSnapshots expireSnapshots() {
+ return table.expireSnapshots();
+ }
+
+ @Override
+ public ManageSnapshots manageSnapshots() {
+ return table.manageSnapshots();
+ }
+
+ @Override
+ public Transaction newTransaction() {
+ return table.newTransaction();
+ }
+
+ @Override
+ public EncryptionManager encryption() {
+ return table.encryption();
+ }
+
+ @Override
+ public LocationProvider locationProvider() {
+ return table.locationProvider();
+ }
+
+ @Override
+ public List<StatisticsFile> statisticsFiles() {
+ return table.statisticsFiles();
+ }
+
+ @Override
+ public Map<String, SnapshotRef> refs() {
+ return table.refs();
+ }
+
+ @Override
+ public IncrementalAppendScan newIncrementalAppendScan() {
+ return table.newIncrementalAppendScan();
+ }
+
+ @Override
+ public IncrementalChangelogScan newIncrementalChangelogScan() {
+ return table.newIncrementalChangelogScan();
+ }
+
+ @Override
+ public PartitionStatisticsScan newPartitionStatisticsScan() {
+ return table.newPartitionStatisticsScan();
+ }
+
+ @Override
+ public AppendFiles newFastAppend() {
+ return table.newFastAppend();
+ }
+
+ @Override
+ public UpdateStatistics updateStatistics() {
+ return table.updateStatistics();
+ }
+
+ @Override
+ public UpdatePartitionStatistics updatePartitionStatistics() {
+ return table.updatePartitionStatistics();
+ }
+
+ @Override
+ public List<PartitionStatisticsFile> partitionStatisticsFiles() {
+ return table.partitionStatisticsFiles();
+ }
+
+ @Override
+ public UUID uuid() {
+ return table.uuid();
+ }
+
+ @Override
+ public TableOperations operations() {
+ return table instanceof HasTableOperations ? ((HasTableOperations)
table).operations() : null;
+ }
Review Comment:
TableWithIO declares it implements HasTableOperations but operations() may
return null when the wrapped table is not a HasTableOperations. This violates
the HasTableOperations contract and can cause NPEs in callers that rely on it
(e.g., SerializableTable metadataFileLocation() calls operations().current()).
Either require the wrapped table to implement HasTableOperations (and fail fast
in the constructor) or drop HasTableOperations from this wrapper and avoid
exposing operations() at all.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]