kbendick commented on a change in pull request #3308:
URL: https://github.com/apache/iceberg/pull/3308#discussion_r731511402
##########
File path: site/docs/flink.md
##########
@@ -206,7 +206,7 @@ CREATE CATALOG hive_catalog WITH (
```
* `type`: Please just use `iceberg` for iceberg table format. (Required)
-* `catalog-type`: Iceberg currently support `hive` or `hadoop` catalog type.
(Required)
+* `catalog-type`: Iceberg currently support `hive` or `hadoop` catalog type.
In case using custom catalog, this should be left unset. (Required)
Review comment:
Also +1 to the use of `when`. I would personally write something along
the lines of `Iceberg currently supports ``hive`` or ``hadoop`` catalog types,
as well as custom catalogs. When using a custom catalog, this should be left
unset. (Required).`
The phrasing is totally up to you but thought I'd throw it out there as I
thought about it 🙂
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFlinkCatalogFactory {
+
+ private Map<String, String> props;
+
+ @Before
+ public void before() {
+ props = Maps.newHashMap();
+ props.put("type", "iceberg");
+ props.put(CatalogProperties.WAREHOUSE_LOCATION, "/tmp/location");
+ }
+
+ @Test
+ public void testCreateCreateCatalogHive() {
+ String catalogName = "hiveCatalog";
+ props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE,
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HIVE);
+
+ Catalog catalog = FlinkCatalogFactory
+ .createCatalogLoader(catalogName, props, new Configuration())
+ .loadCatalog();
+
+ Assert.assertNotNull(catalog);
+ Assertions.assertThat(catalog).isInstanceOf(HiveCatalog.class);
+ }
+
+ @Test
+ public void testCreateCreateCatalogHadoop() {
+ String catalogName = "hadoopCatalog";
+ props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE,
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HADOOP);
+
+ Catalog catalog = FlinkCatalogFactory
+ .createCatalogLoader(catalogName, props, new Configuration())
+ .loadCatalog();
+
+ Assert.assertNotNull(catalog);
+ Assertions.assertThat(catalog).isInstanceOf(HadoopCatalog.class);
+ }
+
+ @Test
+ public void testCreateCreateCatalogCustom() {
+ String catalogName = "customCatalog";
+ props.put(CatalogProperties.CATALOG_IMPL,
CustomHadoopCatalog.class.getName());
+
+ Catalog catalog = FlinkCatalogFactory
+ .createCatalogLoader(catalogName, props, new Configuration())
+ .loadCatalog();
+
+ Assert.assertNotNull(catalog);
+ Assertions.assertThat(catalog).isInstanceOf(CustomHadoopCatalog.class);
+ }
+
+ @Test
+ public void testCreateCreateCatalogCustomWithHiveCatalogTypeSet() {
+ String catalogName = "customCatalog";
+ props.put(CatalogProperties.CATALOG_IMPL,
CustomHadoopCatalog.class.getName());
+ props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE,
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HIVE);
+
+ AssertHelpers.assertThrows("Should complain about both configs being set",
IllegalArgumentException.class,
+ "both catalog-type and catalog-impl are set", () ->
+ FlinkCatalogFactory.createCatalogLoader(catalogName, props, new
Configuration()));
+ }
+
+ @Test
+ public void testLoadCatalogUnknown() {
+ String catalogName = "unknownCatalog";
+ props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, "fooType");
+
+ AssertHelpers.assertThrows(
+ "should complain about catalog not supported",
UnsupportedOperationException.class,
+ "Unknown catalog type:", () ->
+ FlinkCatalogFactory.createCatalogLoader(catalogName, props, new
Configuration())
Review comment:
Nit:
Could you please provide more descriptive error messages and test messages?
We try for "what is the problem" + "what caused it" + "hot to fix it" more or
less. By no means is that a hard rule, but I'd try to be a bit more descriptive
than this to match the convention (this is personally how I write my own tests
too).
```java
AssertHelpers.assertThrows(
"Should throw when an unregistered / unknown catalog is set as the
catalog factor's`type` setting", UnsupportedOperationException.class,
"Unknown catalog type:", () ->
FlinkCatalogFactory.createCatalogLoader(catalogName, props, new
Configuration())
);
```
I think this would help explain what the tests are saying, which helps
explain the code for others (including myself).
Thank you so much for your contribution btw @omarsmak
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -83,6 +83,10 @@
static CatalogLoader createCatalogLoader(String name, Map<String, String>
properties, Configuration hadoopConf) {
String catalogImpl = properties.get(CatalogProperties.CATALOG_IMPL);
if (catalogImpl != null) {
+ String catalogType = properties.get(ICEBERG_CATALOG_TYPE);
+ Preconditions.checkArgument(catalogType == null,
+ "Cannot create catalog %s, both catalog-type and catalog-impl are
set: catalog-type=%s, catalog-impl=%s",
Review comment:
Do we have recommendations on which values to set that we can provide?
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogFactory.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFlinkCatalogFactory {
+
+ private Map<String, String> props;
+
+ @Before
+ public void before() {
+ props = Maps.newHashMap();
+ props.put("type", "iceberg");
+ props.put(CatalogProperties.WAREHOUSE_LOCATION, "/tmp/location");
+ }
+
+ @Test
+ public void testCreateCreateCatalogHive() {
+ String catalogName = "hiveCatalog";
+ props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE,
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HIVE);
+
+ Catalog catalog = FlinkCatalogFactory
+ .createCatalogLoader(catalogName, props, new Configuration())
+ .loadCatalog();
+
+ Assert.assertNotNull(catalog);
+ Assertions.assertThat(catalog).isInstanceOf(HiveCatalog.class);
+ }
+
+ @Test
+ public void testCreateCreateCatalogHadoop() {
+ String catalogName = "hadoopCatalog";
+ props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE,
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HADOOP);
+
+ Catalog catalog = FlinkCatalogFactory
+ .createCatalogLoader(catalogName, props, new Configuration())
+ .loadCatalog();
+
+ Assert.assertNotNull(catalog);
+ Assertions.assertThat(catalog).isInstanceOf(HadoopCatalog.class);
+ }
+
+ @Test
+ public void testCreateCreateCatalogCustom() {
+ String catalogName = "customCatalog";
+ props.put(CatalogProperties.CATALOG_IMPL,
CustomHadoopCatalog.class.getName());
+
+ Catalog catalog = FlinkCatalogFactory
+ .createCatalogLoader(catalogName, props, new Configuration())
+ .loadCatalog();
+
+ Assert.assertNotNull(catalog);
+ Assertions.assertThat(catalog).isInstanceOf(CustomHadoopCatalog.class);
+ }
+
+ @Test
+ public void testCreateCreateCatalogCustomWithHiveCatalogTypeSet() {
+ String catalogName = "customCatalog";
+ props.put(CatalogProperties.CATALOG_IMPL,
CustomHadoopCatalog.class.getName());
+ props.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE,
FlinkCatalogFactory.ICEBERG_CATALOG_TYPE_HIVE);
+
+ AssertHelpers.assertThrows("Should complain about both configs being set",
IllegalArgumentException.class,
Review comment:
Same note here as below about the assertion description 👍
##########
File path: site/docs/flink.md
##########
@@ -206,7 +206,7 @@ CREATE CATALOG hive_catalog WITH (
```
* `type`: Please just use `iceberg` for iceberg table format. (Required)
-* `catalog-type`: Iceberg currently support `hive` or `hadoop` catalog type.
(Required)
+* `catalog-type`: Iceberg currently support `hive` or `hadoop` catalog type.
In case using custom catalog, this should be left unset. (Required)
Review comment:
I think it should. Given that `catalog-impl` is given an Iceberg type
it's definitely relevant here. It's just defined by the Flink API and not by
our own Iceberg API, but it's still definitely relevant here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]