This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 89d6bc323cc Fix Iceberg Integration tests (#34686)
89d6bc323cc is described below

commit 89d6bc323cc263eb2fc4779a9fc6fb8281d35bda
Author: akashorabek <[email protected]>
AuthorDate: Thu Apr 24 22:03:39 2025 +0500

    Fix Iceberg Integration tests (#34686)
    
    * Skip HiveCatalogIT and BigQueryMetastoreCatalogIT in Iceberg ITs
    
    * Fix Iceberg Integration tests
    
    * Change SALT var name
    
    * Fix comments
    
    * Decrease numRecords for DirectRunner
---
 sdks/java/io/iceberg/build.gradle                          |  2 +-
 .../sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java |  7 ++++---
 .../apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java  |  4 +++-
 .../beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java  | 14 ++++++++++----
 4 files changed, 18 insertions(+), 9 deletions(-)

diff --git a/sdks/java/io/iceberg/build.gradle 
b/sdks/java/io/iceberg/build.gradle
index 741e8ee554f..e4d148cf556 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -148,7 +148,7 @@ task integrationTest(type: Test) {
         exclude '**/BigQueryMetastoreCatalogIT.class'
     }
 
-    maxParallelForks 4
+    maxParallelForks 1
     classpath = sourceSets.test.runtimeClasspath
     testClassesDirs = sourceSets.test.output.classesDirs
 }
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
index 3a8b47cb5a0..d224f0caafc 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java
@@ -48,7 +48,7 @@ public class BigQueryMetastoreCatalogIT extends 
IcebergCatalogBaseIT {
   private static final BigqueryClient BQ_CLIENT = new 
BigqueryClient("BigQueryMetastoreCatalogIT");
   static final String BQMS_CATALOG = 
"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
   static final String DATASET = "managed_iceberg_bqms_tests_" + 
System.nanoTime();;
-  static final long SALT = System.nanoTime();
+  private long salt = System.nanoTime();
 
   @BeforeClass
   public static void createDataset() throws IOException, InterruptedException {
@@ -62,11 +62,12 @@ public class BigQueryMetastoreCatalogIT extends 
IcebergCatalogBaseIT {
 
   @Override
   public String tableId() {
-    return DATASET + "." + testName.getMethodName() + "_" + SALT;
+    return DATASET + "." + testName.getMethodName() + "_" + salt;
   }
 
   @Override
   public Catalog createCatalog() {
+    salt += System.nanoTime();
     return CatalogUtil.loadCatalog(
         BQMS_CATALOG,
         "bqms_" + catalogName,
@@ -82,7 +83,7 @@ public class BigQueryMetastoreCatalogIT extends 
IcebergCatalogBaseIT {
   public void catalogCleanup() {
     for (TableIdentifier tableIdentifier : 
catalog.listTables(Namespace.of(DATASET))) {
       // only delete tables that were created in this test run
-      if (tableIdentifier.name().contains(String.valueOf(SALT))) {
+      if (tableIdentifier.name().contains(String.valueOf(salt))) {
         catalog.dropTable(tableIdentifier);
       }
     }
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java
index acb0e36b4b0..4c3a620bebc 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java
@@ -41,6 +41,7 @@ import org.junit.BeforeClass;
  */
 public class HiveCatalogIT extends IcebergCatalogBaseIT {
   private static HiveMetastoreExtension hiveMetastoreExtension;
+  private long salt = System.nanoTime();
 
   private String testDb() {
     return "test_db_" + testName.getMethodName();
@@ -48,7 +49,7 @@ public class HiveCatalogIT extends IcebergCatalogBaseIT {
 
   @Override
   public String tableId() {
-    return String.format("%s.%s", testDb(), "test_table");
+    return String.format("%s.%s", testDb(), "test_table" + "_" + salt);
   }
 
   @BeforeClass
@@ -73,6 +74,7 @@ public class HiveCatalogIT extends IcebergCatalogBaseIT {
 
   @Override
   public Catalog createCatalog() {
+    salt += System.nanoTime();
     return CatalogUtil.loadCatalog(
         HiveCatalog.class.getName(),
         "hive_" + catalogName,
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
index 6f6e475fd85..a6b4df69f42 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java
@@ -133,6 +133,8 @@ import org.slf4j.LoggerFactory;
  * #numRecords()}.
  */
 public abstract class IcebergCatalogBaseIT implements Serializable {
+  private static final long SETUP_TEARDOWN_SLEEP_MS = 5000;
+
   public abstract Catalog createCatalog();
 
   public abstract Map<String, Object> managedIcebergConfig(String tableId);
@@ -142,7 +144,7 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
   public void catalogCleanup() throws Exception {}
 
   public Integer numRecords() {
-    return 1000;
+    return OPTIONS.getRunner().equals(DirectRunner.class) ? 10 : 1000;
   }
 
   public String tableId() {
@@ -159,7 +161,8 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
 
   @Before
   public void setUp() throws Exception {
-    OPTIONS.as(DirectOptions.class).setTargetParallelism(3);
+    catalogName += System.nanoTime();
+    OPTIONS.as(DirectOptions.class).setTargetParallelism(1);
     warehouse =
         String.format(
             "%s/%s/%s",
@@ -169,12 +172,14 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
     warehouse = warehouse(getClass());
     catalogSetup();
     catalog = createCatalog();
+    Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
   }
 
   @After
   public void cleanUp() throws Exception {
     try {
       catalogCleanup();
+      Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
     } catch (Exception e) {
       LOG.warn("Catalog cleanup failed.", e);
     }
@@ -201,6 +206,7 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
                 .collect(Collectors.toList());
         gcsUtil.remove(filesToDelete);
       }
+      Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
     } catch (Exception e) {
       LOG.warn("Failed to clean up GCS files.", e);
     }
@@ -216,9 +222,9 @@ public abstract class IcebergCatalogBaseIT implements 
Serializable {
 
   @Rule
   public transient Timeout globalTimeout =
-      Timeout.seconds(OPTIONS.getRunner().equals(DirectRunner.class) ? 180 : 
20 * 60);
+      Timeout.seconds(OPTIONS.getRunner().equals(DirectRunner.class) ? 300 : 
20 * 60);
 
-  private static final int NUM_SHARDS = 10;
+  private static final int NUM_SHARDS = 
OPTIONS.getRunner().equals(DirectRunner.class) ? 1 : 10;
   private static final Logger LOG = 
LoggerFactory.getLogger(IcebergCatalogBaseIT.class);
   private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
       Schema.builder()

Reply via email to