This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new f246614ed9 Flink: Backport #9078 to v1.16 and v1.15 (#9151)
f246614ed9 is described below
commit f246614ed9d98f84eae13ff57768d377a95d3ced
Author: CG <[email protected]>
AuthorDate: Mon Nov 27 18:16:19 2023 +0800
Flink: Backport #9078 to v1.16 and v1.15 (#9151)
---
.../iceberg/flink/MiniFlinkClusterExtension.java | 53 +++++++++
.../java/org/apache/iceberg/flink/TestBase.java | 130 +++++++++++++++++++++
.../iceberg/flink/TestCatalogTableLoader.java | 21 ++--
.../iceberg/flink/MiniFlinkClusterExtension.java | 53 +++++++++
.../java/org/apache/iceberg/flink/TestBase.java | 130 +++++++++++++++++++++
.../iceberg/flink/TestCatalogTableLoader.java | 21 ++--
6 files changed, 388 insertions(+), 20 deletions(-)
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/MiniFlinkClusterExtension.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/MiniFlinkClusterExtension.java
new file mode 100644
index 0000000000..9a73b80e07
--- /dev/null
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/MiniFlinkClusterExtension.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+public class MiniFlinkClusterExtension {
+
+ private static final int DEFAULT_TM_NUM = 1;
+ private static final int DEFAULT_PARALLELISM = 4;
+
+ public static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG =
+ new Configuration()
+ // disable classloader check as Avro may cache class/object in the
serializers.
+ .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
+ private MiniFlinkClusterExtension() {}
+
+ /**
+ * It will start a mini cluster with
classloader.check-leaked-classloader=false, so that we won't
+ * break the unit tests because of the class loader leak issue. In our
iceberg integration tests,
+ * there're some that will assert the results after finished the flink jobs,
so actually we may
+ * access the class loader that has been closed by the flink task managers
if we enable the switch
+ * classloader.check-leaked-classloader by default.
+ */
+ public static MiniClusterExtension createWithClassloaderCheckDisabled() {
+ return new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(DEFAULT_TM_NUM)
+ .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+ .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+ .build());
+ }
+}
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
new file mode 100644
index 0000000000..4fc0207f26
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class TestBase extends TestBaseUtils {
+
+ @RegisterExtension
+ public static MiniClusterExtension miniClusterResource =
+ MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
+
+ @TempDir Path temporaryDirectory;
+
+ private static TestHiveMetastore metastore = null;
+ protected static HiveConf hiveConf = null;
+ protected static HiveCatalog catalog = null;
+
+ private volatile TableEnvironment tEnv = null;
+
+ @BeforeAll
+ public static void startMetastore() {
+ TestBase.metastore = new TestHiveMetastore();
+ metastore.start();
+ TestBase.hiveConf = metastore.hiveConf();
+ TestBase.catalog =
+ (HiveCatalog)
+ CatalogUtil.loadCatalog(
+ HiveCatalog.class.getName(), "hive", ImmutableMap.of(),
hiveConf);
+ }
+
+ @AfterAll
+ public static void stopMetastore() throws Exception {
+ metastore.stop();
+ TestBase.catalog = null;
+ }
+
+ protected TableEnvironment getTableEnv() {
+ if (tEnv == null) {
+ synchronized (this) {
+ if (tEnv == null) {
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
+
+ TableEnvironment env = TableEnvironment.create(settings);
+ env.getConfig()
+ .getConfiguration()
+
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
+ tEnv = env;
+ }
+ }
+ }
+ return tEnv;
+ }
+
+ protected static TableResult exec(TableEnvironment env, String query,
Object... args) {
+ return env.executeSql(String.format(query, args));
+ }
+
+ protected TableResult exec(String query, Object... args) {
+ return exec(getTableEnv(), query, args);
+ }
+
+ protected List<Row> sql(String query, Object... args) {
+ TableResult tableResult = exec(query, args);
+ try (CloseableIterator<Row> iter = tableResult.collect()) {
+ return Lists.newArrayList(iter);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to collect table result", e);
+ }
+ }
+
+ protected void assertSameElements(Iterable<Row> expected, Iterable<Row>
actual) {
+
Assertions.assertThat(actual).isNotNull().containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ protected void assertSameElements(String message, Iterable<Row> expected,
Iterable<Row> actual) {
+ Assertions.assertThat(actual)
+ .isNotNull()
+ .as(message)
+ .containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ /**
+ * We can not drop currently used catalog after FLINK-29677, so we have make
sure that we do not
+ * use the current catalog before dropping it. This method switches to the
'default_catalog' and
+ * drops the one requested.
+ *
+ * @param catalogName The catalog to drop
+ * @param ifExists If we should use the 'IF EXISTS' when dropping the catalog
+ */
+ protected void dropCatalog(String catalogName, boolean ifExists) {
+ sql("USE CATALOG default_catalog");
+ sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
+ }
+}
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
index b3a2d45261..147d2a173d 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
@@ -35,32 +35,33 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
/** Test for {@link TableLoader}. */
-public class TestCatalogTableLoader extends FlinkTestBase {
+public class TestCatalogTableLoader extends TestBase {
private static File warehouse = null;
private static final TableIdentifier IDENTIFIER =
TableIdentifier.of("default", "my_table");
private static final Schema SCHEMA =
new Schema(Types.NestedField.required(1, "f1", Types.StringType.get()));
- @BeforeClass
+ @BeforeAll
public static void createWarehouse() throws IOException {
warehouse = File.createTempFile("warehouse", null);
- Assert.assertTrue(warehouse.delete());
+ Assertions.assertThat(warehouse.delete()).isTrue();
hiveConf.set("my_key", "my_value");
}
- @AfterClass
+ @AfterAll
public static void dropWarehouse() throws IOException {
if (warehouse != null && warehouse.exists()) {
Path warehousePath = new Path(warehouse.getAbsolutePath());
FileSystem fs = warehousePath.getFileSystem(hiveConf);
- Assert.assertTrue("Failed to delete " + warehousePath,
fs.delete(warehousePath, true));
+ Assertions.assertThat(fs.delete(warehousePath, true))
+ .as("Failed to delete " + warehousePath)
+ .isTrue();
}
}
@@ -97,7 +98,7 @@ public class TestCatalogTableLoader extends FlinkTestBase {
.as("FileIO should be a HadoopFileIO")
.isInstanceOf(HadoopFileIO.class);
HadoopFileIO hadoopIO = (HadoopFileIO) io;
- Assert.assertEquals("my_value", hadoopIO.conf().get("my_key"));
+ Assertions.assertThat(hadoopIO.conf().get("my_key")).isEqualTo("my_value");
}
@SuppressWarnings("unchecked")
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/MiniFlinkClusterExtension.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/MiniFlinkClusterExtension.java
new file mode 100644
index 0000000000..9a73b80e07
--- /dev/null
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/MiniFlinkClusterExtension.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+public class MiniFlinkClusterExtension {
+
+ private static final int DEFAULT_TM_NUM = 1;
+ private static final int DEFAULT_PARALLELISM = 4;
+
+ public static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG =
+ new Configuration()
+ // disable classloader check as Avro may cache class/object in the
serializers.
+ .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+
+ private MiniFlinkClusterExtension() {}
+
+ /**
+ * It will start a mini cluster with
classloader.check-leaked-classloader=false, so that we won't
+ * break the unit tests because of the class loader leak issue. In our
iceberg integration tests,
+ * there're some that will assert the results after finished the flink jobs,
so actually we may
+ * access the class loader that has been closed by the flink task managers
if we enable the switch
+ * classloader.check-leaked-classloader by default.
+ */
+ public static MiniClusterExtension createWithClassloaderCheckDisabled() {
+ return new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(DEFAULT_TM_NUM)
+ .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+ .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
+ .build());
+ }
+}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
new file mode 100644
index 0000000000..4fc0207f26
--- /dev/null
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class TestBase extends TestBaseUtils {
+
+ @RegisterExtension
+ public static MiniClusterExtension miniClusterResource =
+ MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();
+
+ @TempDir Path temporaryDirectory;
+
+ private static TestHiveMetastore metastore = null;
+ protected static HiveConf hiveConf = null;
+ protected static HiveCatalog catalog = null;
+
+ private volatile TableEnvironment tEnv = null;
+
+ @BeforeAll
+ public static void startMetastore() {
+ TestBase.metastore = new TestHiveMetastore();
+ metastore.start();
+ TestBase.hiveConf = metastore.hiveConf();
+ TestBase.catalog =
+ (HiveCatalog)
+ CatalogUtil.loadCatalog(
+ HiveCatalog.class.getName(), "hive", ImmutableMap.of(),
hiveConf);
+ }
+
+ @AfterAll
+ public static void stopMetastore() throws Exception {
+ metastore.stop();
+ TestBase.catalog = null;
+ }
+
+ protected TableEnvironment getTableEnv() {
+ if (tEnv == null) {
+ synchronized (this) {
+ if (tEnv == null) {
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
+
+ TableEnvironment env = TableEnvironment.create(settings);
+ env.getConfig()
+ .getConfiguration()
+
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
+ tEnv = env;
+ }
+ }
+ }
+ return tEnv;
+ }
+
+ protected static TableResult exec(TableEnvironment env, String query,
Object... args) {
+ return env.executeSql(String.format(query, args));
+ }
+
+ protected TableResult exec(String query, Object... args) {
+ return exec(getTableEnv(), query, args);
+ }
+
+ protected List<Row> sql(String query, Object... args) {
+ TableResult tableResult = exec(query, args);
+ try (CloseableIterator<Row> iter = tableResult.collect()) {
+ return Lists.newArrayList(iter);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to collect table result", e);
+ }
+ }
+
+ protected void assertSameElements(Iterable<Row> expected, Iterable<Row>
actual) {
+
Assertions.assertThat(actual).isNotNull().containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ protected void assertSameElements(String message, Iterable<Row> expected,
Iterable<Row> actual) {
+ Assertions.assertThat(actual)
+ .isNotNull()
+ .as(message)
+ .containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ /**
+ * We can not drop currently used catalog after FLINK-29677, so we have make
sure that we do not
+ * use the current catalog before dropping it. This method switches to the
'default_catalog' and
+ * drops the one requested.
+ *
+ * @param catalogName The catalog to drop
+ * @param ifExists If we should use the 'IF EXISTS' when dropping the catalog
+ */
+ protected void dropCatalog(String catalogName, boolean ifExists) {
+ sql("USE CATALOG default_catalog");
+ sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
+ }
+}
diff --git
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
index b3a2d45261..147d2a173d 100644
---
a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
+++
b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestCatalogTableLoader.java
@@ -35,32 +35,33 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
/** Test for {@link TableLoader}. */
-public class TestCatalogTableLoader extends FlinkTestBase {
+public class TestCatalogTableLoader extends TestBase {
private static File warehouse = null;
private static final TableIdentifier IDENTIFIER =
TableIdentifier.of("default", "my_table");
private static final Schema SCHEMA =
new Schema(Types.NestedField.required(1, "f1", Types.StringType.get()));
- @BeforeClass
+ @BeforeAll
public static void createWarehouse() throws IOException {
warehouse = File.createTempFile("warehouse", null);
- Assert.assertTrue(warehouse.delete());
+ Assertions.assertThat(warehouse.delete()).isTrue();
hiveConf.set("my_key", "my_value");
}
- @AfterClass
+ @AfterAll
public static void dropWarehouse() throws IOException {
if (warehouse != null && warehouse.exists()) {
Path warehousePath = new Path(warehouse.getAbsolutePath());
FileSystem fs = warehousePath.getFileSystem(hiveConf);
- Assert.assertTrue("Failed to delete " + warehousePath,
fs.delete(warehousePath, true));
+ Assertions.assertThat(fs.delete(warehousePath, true))
+ .as("Failed to delete " + warehousePath)
+ .isTrue();
}
}
@@ -97,7 +98,7 @@ public class TestCatalogTableLoader extends FlinkTestBase {
.as("FileIO should be a HadoopFileIO")
.isInstanceOf(HadoopFileIO.class);
HadoopFileIO hadoopIO = (HadoopFileIO) io;
- Assert.assertEquals("my_value", hadoopIO.conf().get("my_key"));
+ Assertions.assertThat(hadoopIO.conf().get("my_key")).isEqualTo("my_value");
}
@SuppressWarnings("unchecked")