This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new f246614ed9 Flink: Backport #9078 to v1.16 and v1.15 (#9151)
f246614ed9 is described below

commit f246614ed9d98f84eae13ff57768d377a95d3ced
Author: CG <[email protected]>
AuthorDate: Mon Nov 27 18:16:19 2023 +0800

    Flink: Backport #9078 to v1.16 and v1.15 (#9151)
---
 .../iceberg/flink/MiniFlinkClusterExtension.java   |  53 +++++++++
 .../java/org/apache/iceberg/flink/TestBase.java    | 130 +++++++++++++++++++++
 .../iceberg/flink/TestCatalogTableLoader.java      |  21 ++--
 .../iceberg/flink/MiniFlinkClusterExtension.java   |  53 +++++++++
 .../java/org/apache/iceberg/flink/TestBase.java    | 130 +++++++++++++++++++++
 .../iceberg/flink/TestCatalogTableLoader.java      |  21 ++--
 6 files changed, 388 insertions(+), 20 deletions(-)

diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/MiniFlinkClusterExtension.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/MiniFlinkClusterExtension.java
new file mode 100644
index 0000000000..9a73b80e07
--- /dev/null
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/MiniFlinkClusterExtension.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+public class MiniFlinkClusterExtension {
+
+  private static final int DEFAULT_TM_NUM = 1;
+  private static final int DEFAULT_PARALLELISM = 4;
+
+  public static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG =
+      new Configuration()
+          // disable classloader check as Avro may cache class/object in the 
serializers.
+          .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
+  private MiniFlinkClusterExtension() {}
+
+  /**
+   * It will start a mini cluster with 
classloader.check-leaked-classloader=false, so that we won't
+   * break the unit tests because of the class loader leak issue. In our 
iceberg integration tests,
+   * there're some that will assert the results after finished the flink jobs, 
so actually we may
+   * access the class loader that has been closed by the flink task managers 
if we enable the switch
+   * classloader.check-leaked-classloader by default.
+   */
+  public static MiniClusterExtension createWithClassloaderCheckDisabled() {
+    return new MiniClusterExtension(
+        new MiniClusterResourceConfiguration.Builder()
+            .setNumberTaskManagers(DEFAULT_TM_NUM)
+            .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+            .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .build());
+  }
+}
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestBase.java 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
new file mode 100644
index 0000000000..4fc0207f26
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class TestBase extends TestBaseUtils {
+
+  @RegisterExtension
+  public static MiniClusterExtension miniClusterResource =
+      MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
+
+  @TempDir Path temporaryDirectory;
+
+  private static TestHiveMetastore metastore = null;
+  protected static HiveConf hiveConf = null;
+  protected static HiveCatalog catalog = null;
+
+  private volatile TableEnvironment tEnv = null;
+
+  @BeforeAll
+  public static void startMetastore() {
+    TestBase.metastore = new TestHiveMetastore();
+    metastore.start();
+    TestBase.hiveConf = metastore.hiveConf();
+    TestBase.catalog =
+        (HiveCatalog)
+            CatalogUtil.loadCatalog(
+                HiveCatalog.class.getName(), "hive", ImmutableMap.of(), 
hiveConf);
+  }
+
+  @AfterAll
+  public static void stopMetastore() throws Exception {
+    metastore.stop();
+    TestBase.catalog = null;
+  }
+
+  protected TableEnvironment getTableEnv() {
+    if (tEnv == null) {
+      synchronized (this) {
+        if (tEnv == null) {
+          EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+
+          TableEnvironment env = TableEnvironment.create(settings);
+          env.getConfig()
+              .getConfiguration()
+              
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
+          tEnv = env;
+        }
+      }
+    }
+    return tEnv;
+  }
+
+  protected static TableResult exec(TableEnvironment env, String query, 
Object... args) {
+    return env.executeSql(String.format(query, args));
+  }
+
+  protected TableResult exec(String query, Object... args) {
+    return exec(getTableEnv(), query, args);
+  }
+
+  protected List<Row> sql(String query, Object... args) {
+    TableResult tableResult = exec(query, args);
+    try (CloseableIterator<Row> iter = tableResult.collect()) {
+      return Lists.newArrayList(iter);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to collect table result", e);
+    }
+  }
+
+  protected void assertSameElements(Iterable<Row> expected, Iterable<Row> 
actual) {
+    
Assertions.assertThat(actual).isNotNull().containsExactlyInAnyOrderElementsOf(expected);
+  }
+
+  protected void assertSameElements(String message, Iterable<Row> expected, 
Iterable<Row> actual) {
+    Assertions.assertThat(actual)
+        .isNotNull()
+        .as(message)
+        .containsExactlyInAnyOrderElementsOf(expected);
+  }
+
+  /**
+   * We can not drop currently used catalog after FLINK-29677, so we have make 
sure that we do not
+   * use the current catalog before dropping it. This method switches to the 
'default_catalog' and
+   * drops the one requested.
+   *
+   * @param catalogName The catalog to drop
+   * @param ifExists If we should use the 'IF EXISTS' when dropping the catalog
+   */
+  protected void dropCatalog(String catalogName, boolean ifExists) {
+    sql("USE CATALOG default_catalog");
+    sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
+  }
+}
diff --git 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
index b3a2d45261..147d2a173d 100644
--- 
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
+++ 
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
@@ -35,32 +35,33 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
 import org.assertj.core.api.Assertions;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 /** Test for {@link TableLoader}. */
-public class TestCatalogTableLoader extends FlinkTestBase {
+public class TestCatalogTableLoader extends TestBase {
 
   private static File warehouse = null;
   private static final TableIdentifier IDENTIFIER = 
TableIdentifier.of("default", "my_table");
   private static final Schema SCHEMA =
       new Schema(Types.NestedField.required(1, "f1", Types.StringType.get()));
 
-  @BeforeClass
+  @BeforeAll
   public static void createWarehouse() throws IOException {
     warehouse = File.createTempFile("warehouse", null);
-    Assert.assertTrue(warehouse.delete());
+    Assertions.assertThat(warehouse.delete()).isTrue();
     hiveConf.set("my_key", "my_value");
   }
 
-  @AfterClass
+  @AfterAll
   public static void dropWarehouse() throws IOException {
     if (warehouse != null && warehouse.exists()) {
       Path warehousePath = new Path(warehouse.getAbsolutePath());
       FileSystem fs = warehousePath.getFileSystem(hiveConf);
-      Assert.assertTrue("Failed to delete " + warehousePath, 
fs.delete(warehousePath, true));
+      Assertions.assertThat(fs.delete(warehousePath, true))
+          .as("Failed to delete " + warehousePath)
+          .isTrue();
     }
   }
 
@@ -97,7 +98,7 @@ public class TestCatalogTableLoader extends FlinkTestBase {
         .as("FileIO should be a HadoopFileIO")
         .isInstanceOf(HadoopFileIO.class);
     HadoopFileIO hadoopIO = (HadoopFileIO) io;
-    Assert.assertEquals("my_value", hadoopIO.conf().get("my_key"));
+    Assertions.assertThat(hadoopIO.conf().get("my_key")).isEqualTo("my_value");
   }
 
   @SuppressWarnings("unchecked")
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/MiniFlinkClusterExtension.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/MiniFlinkClusterExtension.java
new file mode 100644
index 0000000000..9a73b80e07
--- /dev/null
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/MiniFlinkClusterExtension.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+public class MiniFlinkClusterExtension {
+
+  private static final int DEFAULT_TM_NUM = 1;
+  private static final int DEFAULT_PARALLELISM = 4;
+
+  public static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG =
+      new Configuration()
+          // disable classloader check as Avro may cache class/object in the 
serializers.
+          .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
+  private MiniFlinkClusterExtension() {}
+
+  /**
+   * It will start a mini cluster with 
classloader.check-leaked-classloader=false, so that we won't
+   * break the unit tests because of the class loader leak issue. In our 
iceberg integration tests,
+   * there're some that will assert the results after finished the flink jobs, 
so actually we may
+   * access the class loader that has been closed by the flink task managers 
if we enable the switch
+   * classloader.check-leaked-classloader by default.
+   */
+  public static MiniClusterExtension createWithClassloaderCheckDisabled() {
+    return new MiniClusterExtension(
+        new MiniClusterResourceConfiguration.Builder()
+            .setNumberTaskManagers(DEFAULT_TM_NUM)
+            .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+            .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .build());
+  }
+}
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestBase.java 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
new file mode 100644
index 0000000000..4fc0207f26
--- /dev/null
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class TestBase extends TestBaseUtils {
+
+  @RegisterExtension
+  public static MiniClusterExtension miniClusterResource =
+      MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
+
+  @TempDir Path temporaryDirectory;
+
+  private static TestHiveMetastore metastore = null;
+  protected static HiveConf hiveConf = null;
+  protected static HiveCatalog catalog = null;
+
+  private volatile TableEnvironment tEnv = null;
+
+  @BeforeAll
+  public static void startMetastore() {
+    TestBase.metastore = new TestHiveMetastore();
+    metastore.start();
+    TestBase.hiveConf = metastore.hiveConf();
+    TestBase.catalog =
+        (HiveCatalog)
+            CatalogUtil.loadCatalog(
+                HiveCatalog.class.getName(), "hive", ImmutableMap.of(), 
hiveConf);
+  }
+
+  @AfterAll
+  public static void stopMetastore() throws Exception {
+    metastore.stop();
+    TestBase.catalog = null;
+  }
+
+  protected TableEnvironment getTableEnv() {
+    if (tEnv == null) {
+      synchronized (this) {
+        if (tEnv == null) {
+          EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+
+          TableEnvironment env = TableEnvironment.create(settings);
+          env.getConfig()
+              .getConfiguration()
+              
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
+          tEnv = env;
+        }
+      }
+    }
+    return tEnv;
+  }
+
+  protected static TableResult exec(TableEnvironment env, String query, 
Object... args) {
+    return env.executeSql(String.format(query, args));
+  }
+
+  protected TableResult exec(String query, Object... args) {
+    return exec(getTableEnv(), query, args);
+  }
+
+  protected List<Row> sql(String query, Object... args) {
+    TableResult tableResult = exec(query, args);
+    try (CloseableIterator<Row> iter = tableResult.collect()) {
+      return Lists.newArrayList(iter);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to collect table result", e);
+    }
+  }
+
+  protected void assertSameElements(Iterable<Row> expected, Iterable<Row> 
actual) {
+    
Assertions.assertThat(actual).isNotNull().containsExactlyInAnyOrderElementsOf(expected);
+  }
+
+  protected void assertSameElements(String message, Iterable<Row> expected, 
Iterable<Row> actual) {
+    Assertions.assertThat(actual)
+        .isNotNull()
+        .as(message)
+        .containsExactlyInAnyOrderElementsOf(expected);
+  }
+
+  /**
+   * We can not drop currently used catalog after FLINK-29677, so we have make 
sure that we do not
+   * use the current catalog before dropping it. This method switches to the 
'default_catalog' and
+   * drops the one requested.
+   *
+   * @param catalogName The catalog to drop
+   * @param ifExists If we should use the 'IF EXISTS' when dropping the catalog
+   */
+  protected void dropCatalog(String catalogName, boolean ifExists) {
+    sql("USE CATALOG default_catalog");
+    sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
+  }
+}
diff --git 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
index b3a2d45261..147d2a173d 100644
--- 
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
+++ 
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
@@ -35,32 +35,33 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
 import org.assertj.core.api.Assertions;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 /** Test for {@link TableLoader}. */
-public class TestCatalogTableLoader extends FlinkTestBase {
+public class TestCatalogTableLoader extends TestBase {
 
   private static File warehouse = null;
   private static final TableIdentifier IDENTIFIER = 
TableIdentifier.of("default", "my_table");
   private static final Schema SCHEMA =
       new Schema(Types.NestedField.required(1, "f1", Types.StringType.get()));
 
-  @BeforeClass
+  @BeforeAll
   public static void createWarehouse() throws IOException {
     warehouse = File.createTempFile("warehouse", null);
-    Assert.assertTrue(warehouse.delete());
+    Assertions.assertThat(warehouse.delete()).isTrue();
     hiveConf.set("my_key", "my_value");
   }
 
-  @AfterClass
+  @AfterAll
   public static void dropWarehouse() throws IOException {
     if (warehouse != null && warehouse.exists()) {
       Path warehousePath = new Path(warehouse.getAbsolutePath());
       FileSystem fs = warehousePath.getFileSystem(hiveConf);
-      Assert.assertTrue("Failed to delete " + warehousePath, 
fs.delete(warehousePath, true));
+      Assertions.assertThat(fs.delete(warehousePath, true))
+          .as("Failed to delete " + warehousePath)
+          .isTrue();
     }
   }
 
@@ -97,7 +98,7 @@ public class TestCatalogTableLoader extends FlinkTestBase {
         .as("FileIO should be a HadoopFileIO")
         .isInstanceOf(HadoopFileIO.class);
     HadoopFileIO hadoopIO = (HadoopFileIO) io;
-    Assert.assertEquals("my_value", hadoopIO.conf().get("my_key"));
+    Assertions.assertThat(hadoopIO.conf().get("my_key")).isEqualTo("my_value");
   }
 
   @SuppressWarnings("unchecked")

Reply via email to