This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 89d6bc323cc Fix Iceberg Integration tests (#34686)
89d6bc323cc is described below
commit 89d6bc323cc263eb2fc4779a9fc6fb8281d35bda
Author: akashorabek <[email protected]>
AuthorDate: Thu Apr 24 22:03:39 2025 +0500
Fix Iceberg Integration tests (#34686)
* Skip HiveCatalogIT and BigQueryMetastoreCatalogIT in Iceberg ITs
* Fix Iceberg Integration tests
* Change SALT var name
* Fix comments
* Decrease numRecords for DirectRunner
---
sdks/java/io/iceberg/build.gradle | 2 +-
.../sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java | 7 ++++---
.../apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java | 4 +++-
.../beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java | 14 ++++++++++----
4 files changed, 18 insertions(+), 9 deletions(-)
diff --git a/sdks/java/io/iceberg/build.gradle
b/sdks/java/io/iceberg/build.gradle
index 741e8ee554f..e4d148cf556 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -148,7 +148,7 @@ task integrationTest(type: Test) {
exclude '**/BigQueryMetastoreCatalogIT.class'
}
- maxParallelForks 4
+ maxParallelForks 1
classpath = sourceSets.test.runtimeClasspath
testClassesDirs = sourceSets.test.output.classesDirs
}
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
index 3a8b47cb5a0..d224f0caafc 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
@@ -48,7 +48,7 @@ public class BigQueryMetastoreCatalogIT extends
IcebergCatalogBaseIT {
private static final BigqueryClient BQ_CLIENT = new
BigqueryClient("BigQueryMetastoreCatalogIT");
static final String BQMS_CATALOG =
"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
static final String DATASET = "managed_iceberg_bqms_tests_" +
System.nanoTime();;
- static final long SALT = System.nanoTime();
+ private long salt = System.nanoTime();
@BeforeClass
public static void createDataset() throws IOException, InterruptedException {
@@ -62,11 +62,12 @@ public class BigQueryMetastoreCatalogIT extends
IcebergCatalogBaseIT {
@Override
public String tableId() {
- return DATASET + "." + testName.getMethodName() + "_" + SALT;
+ return DATASET + "." + testName.getMethodName() + "_" + salt;
}
@Override
public Catalog createCatalog() {
+ salt += System.nanoTime();
return CatalogUtil.loadCatalog(
BQMS_CATALOG,
"bqms_" + catalogName,
@@ -82,7 +83,7 @@ public class BigQueryMetastoreCatalogIT extends
IcebergCatalogBaseIT {
public void catalogCleanup() {
for (TableIdentifier tableIdentifier :
catalog.listTables(Namespace.of(DATASET))) {
// only delete tables that were created in this test run
- if (tableIdentifier.name().contains(String.valueOf(SALT))) {
+ if (tableIdentifier.name().contains(String.valueOf(salt))) {
catalog.dropTable(tableIdentifier);
}
}
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java
index acb0e36b4b0..4c3a620bebc 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java
@@ -41,6 +41,7 @@ import org.junit.BeforeClass;
*/
public class HiveCatalogIT extends IcebergCatalogBaseIT {
private static HiveMetastoreExtension hiveMetastoreExtension;
+ private long salt = System.nanoTime();
private String testDb() {
return "test_db_" + testName.getMethodName();
@@ -48,7 +49,7 @@ public class HiveCatalogIT extends IcebergCatalogBaseIT {
@Override
public String tableId() {
- return String.format("%s.%s", testDb(), "test_table");
+ return String.format("%s.%s", testDb(), "test_table" + "_" + salt);
}
@BeforeClass
@@ -73,6 +74,7 @@ public class HiveCatalogIT extends IcebergCatalogBaseIT {
@Override
public Catalog createCatalog() {
+ salt += System.nanoTime();
return CatalogUtil.loadCatalog(
HiveCatalog.class.getName(),
"hive_" + catalogName,
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
index 6f6e475fd85..a6b4df69f42 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
@@ -133,6 +133,8 @@ import org.slf4j.LoggerFactory;
* #numRecords()}.
*/
public abstract class IcebergCatalogBaseIT implements Serializable {
+ private static final long SETUP_TEARDOWN_SLEEP_MS = 5000;
+
public abstract Catalog createCatalog();
public abstract Map<String, Object> managedIcebergConfig(String tableId);
@@ -142,7 +144,7 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
public void catalogCleanup() throws Exception {}
public Integer numRecords() {
- return 1000;
+ return OPTIONS.getRunner().equals(DirectRunner.class) ? 10 : 1000;
}
public String tableId() {
@@ -159,7 +161,8 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
@Before
public void setUp() throws Exception {
- OPTIONS.as(DirectOptions.class).setTargetParallelism(3);
+ catalogName += System.nanoTime();
+ OPTIONS.as(DirectOptions.class).setTargetParallelism(1);
warehouse =
String.format(
"%s/%s/%s",
@@ -169,12 +172,14 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
warehouse = warehouse(getClass());
catalogSetup();
catalog = createCatalog();
+ Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
}
@After
public void cleanUp() throws Exception {
try {
catalogCleanup();
+ Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
} catch (Exception e) {
LOG.warn("Catalog cleanup failed.", e);
}
@@ -201,6 +206,7 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
.collect(Collectors.toList());
gcsUtil.remove(filesToDelete);
}
+ Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
} catch (Exception e) {
LOG.warn("Failed to clean up GCS files.", e);
}
@@ -216,9 +222,9 @@ public abstract class IcebergCatalogBaseIT implements
Serializable {
@Rule
public transient Timeout globalTimeout =
- Timeout.seconds(OPTIONS.getRunner().equals(DirectRunner.class) ? 180 :
20 * 60);
+ Timeout.seconds(OPTIONS.getRunner().equals(DirectRunner.class) ? 300 :
20 * 60);
- private static final int NUM_SHARDS = 10;
+ private static final int NUM_SHARDS =
OPTIONS.getRunner().equals(DirectRunner.class) ? 1 : 10;
private static final Logger LOG =
LoggerFactory.getLogger(IcebergCatalogBaseIT.class);
private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
Schema.builder()