This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new b4bb71fc40 Spark: Backport #14933: Snapshot location overlap check to
spark v3.4, v3.5, v4.0 (#15016)
b4bb71fc40 is described below
commit b4bb71fc408d17d2c724f7a2faf622759746523d
Author: Varun Lakhyani <[email protected]>
AuthorDate: Sat Jan 10 23:00:07 2026 +0530
Spark: Backport #14933: Snapshot location overlap check to spark v3.4,
v3.5, v4.0 (#15016)
* Backport #14933 Snapshot location overlap check to spark v3.4, v3.5, v4.0
* spotlessApply
---
.../spark/actions/SnapshotTableSparkAction.java | 11 ++-
.../spark/actions/TestSnapshotTableAction.java | 93 ++++++++++++++++++++++
.../spark/actions/SnapshotTableSparkAction.java | 11 ++-
.../spark/actions/TestSnapshotTableAction.java | 93 ++++++++++++++++++++++
.../spark/actions/SnapshotTableSparkAction.java | 11 ++-
.../spark/actions/TestSnapshotTableAction.java | 93 ++++++++++++++++++++++
6 files changed, 309 insertions(+), 3 deletions(-)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
index 5f7f408cb0..043b63870c 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
@@ -124,7 +124,16 @@ public class SnapshotTableSparkAction extends
BaseTableCreationSparkAction<Snaps
StagedSparkTable stagedTable = stageDestTable();
Table icebergTable = stagedTable.table();
- // TODO: Check the dest table location does not overlap with the source
table location
+ String sourceTableLocation = sourceTableLocation();
+ String stagedTableLocation = icebergTable.location();
+ Preconditions.checkArgument(
+ !sourceTableLocation.equals(stagedTableLocation)
+ && !stagedTableLocation.startsWith(sourceTableLocation + "/")
+ && !sourceTableLocation.startsWith(stagedTableLocation + "/"),
+ "Cannot create a snapshot at location %s because it would overlap with
source table location %s. "
+ + "Overlapping snapshot and source would mix table files.",
+ stagedTableLocation,
+ sourceTableLocation);
boolean threw = true;
try {
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
index d9c42a07b8..9fac633e75 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
@@ -18,7 +18,11 @@
*/
package org.apache.iceberg.spark.actions;
+import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
+import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
import java.nio.file.Files;
@@ -33,6 +37,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(ParameterizedTestExtension.class)
public class TestSnapshotTableAction extends CatalogTestBase {
private static final String SOURCE_NAME = "spark_catalog.default.source";
+ private static final String SOURCE = "source";
@AfterEach
public void removeTables() {
@@ -65,4 +70,92 @@ public class TestSnapshotTableAction extends CatalogTestBase
{
.execute();
assertThat(snapshotThreadsIndex.get()).isEqualTo(2);
}
+
+ @TestTemplate
+ public void testSnapshotWithOverlappingLocation() throws IOException {
+ // Hadoop Catalogs do not Support Custom Table Locations
+ String catalogType = catalogConfig.get(ICEBERG_CATALOG_TYPE);
+ assumeThat(catalogType).isNotEqualTo(ICEBERG_CATALOG_TYPE_HADOOP);
+
+ String sourceLocation =
+ Files.createTempDirectory(temp,
"junit").resolve(SOURCE).toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ SOURCE_NAME, sourceLocation);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", SOURCE_NAME);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", SOURCE_NAME);
+ String actualSourceLocation =
+ spark
+ .sql(String.format("DESCRIBE EXTENDED %s", SOURCE_NAME))
+ .filter("col_name = 'Location'")
+ .select("data_type")
+ .first()
+ .getString(0);
+
+ assertThatThrownBy(
+ () ->
+ SparkActions.get()
+ .snapshotTable(SOURCE_NAME)
+ .as(tableName)
+ .tableLocation(actualSourceLocation)
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith(
+ "The snapshot table location cannot be same as the source table
location.");
+
+ String destAsSubdirectory = actualSourceLocation + "/nested";
+ assertThatThrownBy(
+ () ->
+ SparkActions.get()
+ .snapshotTable(SOURCE_NAME)
+ .as(tableName)
+ .tableLocation(destAsSubdirectory)
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot create a snapshot at location");
+
+ String parentLocation =
+ actualSourceLocation.substring(0, actualSourceLocation.length() - ("/"
+ SOURCE).length());
+ assertThatThrownBy(
+ () ->
+ SparkActions.get()
+ .snapshotTable(SOURCE_NAME)
+ .as(tableName)
+ .tableLocation(parentLocation)
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot create a snapshot at location");
+ }
+
+ @TestTemplate
+ public void testSnapshotWithNonOverlappingLocation() throws IOException {
+ // Hadoop Catalogs do not Support Custom Table Locations
+ String catalogType = catalogConfig.get(ICEBERG_CATALOG_TYPE);
+ assumeThat(catalogType).isNotEqualTo(ICEBERG_CATALOG_TYPE_HADOOP);
+
+ String sourceLocation =
+ Files.createTempDirectory(temp,
"junit").resolve(SOURCE).toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ SOURCE_NAME, sourceLocation);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", SOURCE_NAME);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", SOURCE_NAME);
+ String actualSourceLocation =
+ spark
+ .sql(String.format("DESCRIBE EXTENDED %s", SOURCE_NAME))
+ .filter("col_name = 'Location'")
+ .select("data_type")
+ .first()
+ .getString(0);
+
+ String validDestLocation =
+ actualSourceLocation.substring(0, actualSourceLocation.length() -
SOURCE.length())
+ + "newDestination";
+ SparkActions.get()
+ .snapshotTable(SOURCE_NAME)
+ .as(tableName)
+ .tableLocation(validDestLocation)
+ .execute();
+ assertThat(sql("SELECT * FROM %s", tableName)).hasSize(2);
+ }
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
index 5f7f408cb0..043b63870c 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
@@ -124,7 +124,16 @@ public class SnapshotTableSparkAction extends
BaseTableCreationSparkAction<Snaps
StagedSparkTable stagedTable = stageDestTable();
Table icebergTable = stagedTable.table();
- // TODO: Check the dest table location does not overlap with the source
table location
+ String sourceTableLocation = sourceTableLocation();
+ String stagedTableLocation = icebergTable.location();
+ Preconditions.checkArgument(
+ !sourceTableLocation.equals(stagedTableLocation)
+ && !stagedTableLocation.startsWith(sourceTableLocation + "/")
+ && !sourceTableLocation.startsWith(stagedTableLocation + "/"),
+ "Cannot create a snapshot at location %s because it would overlap with
source table location %s. "
+ + "Overlapping snapshot and source would mix table files.",
+ stagedTableLocation,
+ sourceTableLocation);
boolean threw = true;
try {
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
index d9c42a07b8..9fac633e75 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
@@ -18,7 +18,11 @@
*/
package org.apache.iceberg.spark.actions;
+import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
+import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
import java.nio.file.Files;
@@ -33,6 +37,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(ParameterizedTestExtension.class)
public class TestSnapshotTableAction extends CatalogTestBase {
private static final String SOURCE_NAME = "spark_catalog.default.source";
+ private static final String SOURCE = "source";
@AfterEach
public void removeTables() {
@@ -65,4 +70,92 @@ public class TestSnapshotTableAction extends CatalogTestBase
{
.execute();
assertThat(snapshotThreadsIndex.get()).isEqualTo(2);
}
+
+ @TestTemplate
+ public void testSnapshotWithOverlappingLocation() throws IOException {
+ // Hadoop Catalogs do not Support Custom Table Locations
+ String catalogType = catalogConfig.get(ICEBERG_CATALOG_TYPE);
+ assumeThat(catalogType).isNotEqualTo(ICEBERG_CATALOG_TYPE_HADOOP);
+
+ String sourceLocation =
+ Files.createTempDirectory(temp,
"junit").resolve(SOURCE).toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ SOURCE_NAME, sourceLocation);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", SOURCE_NAME);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", SOURCE_NAME);
+ String actualSourceLocation =
+ spark
+ .sql(String.format("DESCRIBE EXTENDED %s", SOURCE_NAME))
+ .filter("col_name = 'Location'")
+ .select("data_type")
+ .first()
+ .getString(0);
+
+ assertThatThrownBy(
+ () ->
+ SparkActions.get()
+ .snapshotTable(SOURCE_NAME)
+ .as(tableName)
+ .tableLocation(actualSourceLocation)
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith(
+ "The snapshot table location cannot be same as the source table
location.");
+
+ String destAsSubdirectory = actualSourceLocation + "/nested";
+ assertThatThrownBy(
+ () ->
+ SparkActions.get()
+ .snapshotTable(SOURCE_NAME)
+ .as(tableName)
+ .tableLocation(destAsSubdirectory)
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot create a snapshot at location");
+
+ String parentLocation =
+ actualSourceLocation.substring(0, actualSourceLocation.length() - ("/"
+ SOURCE).length());
+ assertThatThrownBy(
+ () ->
+ SparkActions.get()
+ .snapshotTable(SOURCE_NAME)
+ .as(tableName)
+ .tableLocation(parentLocation)
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot create a snapshot at location");
+ }
+
+ @TestTemplate
+ public void testSnapshotWithNonOverlappingLocation() throws IOException {
+ // Hadoop Catalogs do not Support Custom Table Locations
+ String catalogType = catalogConfig.get(ICEBERG_CATALOG_TYPE);
+ assumeThat(catalogType).isNotEqualTo(ICEBERG_CATALOG_TYPE_HADOOP);
+
+ String sourceLocation =
+ Files.createTempDirectory(temp,
"junit").resolve(SOURCE).toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ SOURCE_NAME, sourceLocation);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", SOURCE_NAME);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", SOURCE_NAME);
+ String actualSourceLocation =
+ spark
+ .sql(String.format("DESCRIBE EXTENDED %s", SOURCE_NAME))
+ .filter("col_name = 'Location'")
+ .select("data_type")
+ .first()
+ .getString(0);
+
+ String validDestLocation =
+ actualSourceLocation.substring(0, actualSourceLocation.length() -
SOURCE.length())
+ + "newDestination";
+ SparkActions.get()
+ .snapshotTable(SOURCE_NAME)
+ .as(tableName)
+ .tableLocation(validDestLocation)
+ .execute();
+ assertThat(sql("SELECT * FROM %s", tableName)).hasSize(2);
+ }
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
index 5f7f408cb0..043b63870c 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java
@@ -124,7 +124,16 @@ public class SnapshotTableSparkAction extends
BaseTableCreationSparkAction<Snaps
StagedSparkTable stagedTable = stageDestTable();
Table icebergTable = stagedTable.table();
- // TODO: Check the dest table location does not overlap with the source
table location
+ String sourceTableLocation = sourceTableLocation();
+ String stagedTableLocation = icebergTable.location();
+ Preconditions.checkArgument(
+ !sourceTableLocation.equals(stagedTableLocation)
+ && !stagedTableLocation.startsWith(sourceTableLocation + "/")
+ && !sourceTableLocation.startsWith(stagedTableLocation + "/"),
+ "Cannot create a snapshot at location %s because it would overlap with
source table location %s. "
+ + "Overlapping snapshot and source would mix table files.",
+ stagedTableLocation,
+ sourceTableLocation);
boolean threw = true;
try {
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
index d9c42a07b8..9fac633e75 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestSnapshotTableAction.java
@@ -18,7 +18,11 @@
*/
package org.apache.iceberg.spark.actions;
+import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE;
+import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
import java.nio.file.Files;
@@ -33,6 +37,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(ParameterizedTestExtension.class)
public class TestSnapshotTableAction extends CatalogTestBase {
private static final String SOURCE_NAME = "spark_catalog.default.source";
+ private static final String SOURCE = "source";
@AfterEach
public void removeTables() {
@@ -65,4 +70,92 @@ public class TestSnapshotTableAction extends CatalogTestBase
{
.execute();
assertThat(snapshotThreadsIndex.get()).isEqualTo(2);
}
+
+ @TestTemplate
+ public void testSnapshotWithOverlappingLocation() throws IOException {
+ // Hadoop Catalogs do not Support Custom Table Locations
+ String catalogType = catalogConfig.get(ICEBERG_CATALOG_TYPE);
+ assumeThat(catalogType).isNotEqualTo(ICEBERG_CATALOG_TYPE_HADOOP);
+
+ String sourceLocation =
+ Files.createTempDirectory(temp,
"junit").resolve(SOURCE).toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ SOURCE_NAME, sourceLocation);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", SOURCE_NAME);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", SOURCE_NAME);
+ String actualSourceLocation =
+ spark
+ .sql(String.format("DESCRIBE EXTENDED %s", SOURCE_NAME))
+ .filter("col_name = 'Location'")
+ .select("data_type")
+ .first()
+ .getString(0);
+
+ assertThatThrownBy(
+ () ->
+ SparkActions.get()
+ .snapshotTable(SOURCE_NAME)
+ .as(tableName)
+ .tableLocation(actualSourceLocation)
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith(
+ "The snapshot table location cannot be same as the source table
location.");
+
+ String destAsSubdirectory = actualSourceLocation + "/nested";
+ assertThatThrownBy(
+ () ->
+ SparkActions.get()
+ .snapshotTable(SOURCE_NAME)
+ .as(tableName)
+ .tableLocation(destAsSubdirectory)
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot create a snapshot at location");
+
+ String parentLocation =
+ actualSourceLocation.substring(0, actualSourceLocation.length() - ("/"
+ SOURCE).length());
+ assertThatThrownBy(
+ () ->
+ SparkActions.get()
+ .snapshotTable(SOURCE_NAME)
+ .as(tableName)
+ .tableLocation(parentLocation)
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageStartingWith("Cannot create a snapshot at location");
+ }
+
+ @TestTemplate
+ public void testSnapshotWithNonOverlappingLocation() throws IOException {
+ // Hadoop Catalogs do not Support Custom Table Locations
+ String catalogType = catalogConfig.get(ICEBERG_CATALOG_TYPE);
+ assumeThat(catalogType).isNotEqualTo(ICEBERG_CATALOG_TYPE_HADOOP);
+
+ String sourceLocation =
+ Files.createTempDirectory(temp,
"junit").resolve(SOURCE).toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
LOCATION '%s'",
+ SOURCE_NAME, sourceLocation);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", SOURCE_NAME);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", SOURCE_NAME);
+ String actualSourceLocation =
+ spark
+ .sql(String.format("DESCRIBE EXTENDED %s", SOURCE_NAME))
+ .filter("col_name = 'Location'")
+ .select("data_type")
+ .first()
+ .getString(0);
+
+ String validDestLocation =
+ actualSourceLocation.substring(0, actualSourceLocation.length() -
SOURCE.length())
+ + "newDestination";
+ SparkActions.get()
+ .snapshotTable(SOURCE_NAME)
+ .as(tableName)
+ .tableLocation(validDestLocation)
+ .execute();
+ assertThat(sql("SELECT * FROM %s", tableName)).hasSize(2);
+ }
}