This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a7cc25edb [flink] Should filter options for Flink HiveCatalog in 
FlinkGenericCatalogFactory (#3740)
a7cc25edb is described below

commit a7cc25edb5d026783f73bf80cab8b62717ba660d
Author: xiangyu0xf <xiangyu...@gmail.com>
AuthorDate: Mon Aug 5 12:07:33 2024 +0800

    [flink] Should filter options for Flink HiveCatalog in 
FlinkGenericCatalogFactory (#3740)
---
 .../paimon/flink/FlinkGenericCatalogFactory.java   | 25 +++++++-
 .../flink/FlinkGenericCatalogFactoryTest.java      | 67 ++++++++++++++++++++++
 2 files changed, 91 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
index d07779297..dc2a0f06b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
@@ -23,11 +23,14 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -56,11 +59,31 @@ public class FlinkGenericCatalogFactory implements 
CatalogFactory {
     @Override
     public FlinkGenericCatalog createCatalog(Context context) {
         CatalogFactory hiveFactory = 
createHiveCatalogFactory(context.getClassLoader());
-        Catalog catalog = hiveFactory.createCatalog(context);
+        Context filteredContext = filterContextOptions(context, hiveFactory);
+        Catalog catalog = hiveFactory.createCatalog(filteredContext);
         return createCatalog(
                 context.getClassLoader(), context.getOptions(), 
context.getName(), catalog);
     }
 
+    @VisibleForTesting
+    public Context filterContextOptions(Context context, CatalogFactory 
catalogFactory) {
+        Set<ConfigOption<?>> catalogOptions = new 
HashSet<>(catalogFactory.requiredOptions());
+        catalogOptions.addAll(catalogFactory.optionalOptions());
+        Map<String, String> contextOptions = context.getOptions();
+        Map<String, String> flinkCatalogOptions = new HashMap<>();
+        catalogOptions.forEach(
+                option -> {
+                    if (contextOptions.containsKey(option.key())) {
+                        flinkCatalogOptions.put(option.key(), 
contextOptions.get(option.key()));
+                    }
+                });
+        return new FactoryUtil.DefaultCatalogContext(
+                context.getName(),
+                flinkCatalogOptions,
+                context.getConfiguration(),
+                context.getClassLoader());
+    }
+
     @VisibleForTesting
     public static FlinkGenericCatalog createCatalog(
             ClassLoader cl, Map<String, String> optionMap, String name, 
Catalog flinkCatalog) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkGenericCatalogFactoryTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkGenericCatalogFactoryTest.java
new file mode 100644
index 000000000..3ed2d8183
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkGenericCatalogFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestCatalogFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FlinkGenericCatalogFactory}. */
+public class FlinkGenericCatalogFactoryTest {
+
+    private final FlinkGenericCatalogFactory genericCatalogFactory =
+            new FlinkGenericCatalogFactory();
+
+    @TempDir public static java.nio.file.Path temporaryFolder;
+
+    @Test
+    public void testGenericCatalogOptionsFilter() {
+        String path1 = new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
+        String path2 = new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
+
+        TestCatalogFactory testCatalogFactory = new TestCatalogFactory();
+        String catalogName = "test-catalog";
+        Map<String, String> options = new HashMap<>();
+        options.put("warehouse", path1);
+        options.put(TestCatalogFactory.DEFAULT_DATABASE.key(), path2);
+        CatalogFactory.Context context =
+                new FactoryUtil.DefaultCatalogContext(
+                        catalogName,
+                        options,
+                        null,
+                        FlinkGenericCatalogFactoryTest.class.getClassLoader());
+
+        CatalogFactory.Context flinkContext =
+                genericCatalogFactory.filterContextOptions(context, 
testCatalogFactory);
+
+        Map<String, String> flinkOptions = flinkContext.getOptions();
+        assertThat(flinkOptions.get(TestCatalogFactory.DEFAULT_DATABASE.key()))
+                
.isEqualTo(options.get(TestCatalogFactory.DEFAULT_DATABASE.key()));
+        assertThat(flinkOptions.get("warehouse")).isNull();
+    }
+}

Reply via email to