This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6036be892 [core] Paimon catalog support immutable options (#1886)
6036be892 is described below
commit 6036be8929e8129d50fd3e45262875f511ad44d1
Author: GuojunLi <[email protected]>
AuthorDate: Fri Sep 1 10:53:31 2023 +0800
[core] Paimon catalog support immutable options (#1886)
* [core] Paimon catalog support immutable options
---
.../generated/catalog_configuration.html | 12 ++
.../org/apache/paimon/options/CatalogOptions.java | 37 +++++
.../org/apache/paimon/catalog/CatalogFactory.java | 29 ++++
.../paimon/catalog/CatalogOptionsManager.java | 90 ++++++++++
.../paimon/catalog/CatalogOptionsManagerTest.java | 183 +++++++++++++++++++++
5 files changed, 351 insertions(+)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index d243737a8..4856af9fd 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -26,6 +26,12 @@ under the License.
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>data-lineage</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether save the data lineage information or not, by default
is false.</td>
+ </tr>
<tr>
<td><h5>fs.allow-hadoop-fallback</h5></td>
<td style="word-wrap: break-word;">true</td>
@@ -62,6 +68,12 @@ under the License.
<td>String</td>
<td>Metastore of paimon catalog, supports filesystem and hive.</td>
</tr>
+ <tr>
+ <td><h5>table-lineage</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether save the table lineage information or not, by default
is false.</td>
+ </tr>
<tr>
<td><h5>table.type</h5></td>
<td style="word-wrap: break-word;">managed</td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 0fba499dd..353a81061 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -18,11 +18,15 @@
package org.apache.paimon.options;
+import org.apache.paimon.annotation.Documentation;
import org.apache.paimon.options.description.Description;
import org.apache.paimon.options.description.TextElement;
import org.apache.paimon.table.TableType;
+import java.lang.reflect.Field;
import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
import static org.apache.paimon.options.ConfigOptions.key;
@@ -78,6 +82,7 @@ public class CatalogOptions {
.withDescription(
"Allow to fallback to hadoop File IO when no file
io found for the scheme.");
+ @Documentation.Immutable
public static final ConfigOption<String> LINEAGE_META =
key("lineage-meta")
.stringType()
@@ -97,4 +102,36 @@ public class CatalogOptions {
TextElement.text(
"\"custom\": You can
implement LineageMetaFactory and LineageMeta to store lineage information in
customized storage."))
.build());
+
+ @Documentation.Immutable
+ public static final ConfigOption<Boolean> TABLE_LINEAGE =
+ key("table-lineage")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether save the table lineage information or
not, by default is false.");
+
+ @Documentation.Immutable
+ public static final ConfigOption<Boolean> DATA_LINEAGE =
+ key("data-lineage")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether save the data lineage information or not,
by default is false.");
+
+ public static Set<String> getImmutableOptionKeys() {
+ final Field[] fields = CatalogOptions.class.getFields();
+ final Set<String> immutableKeys = new HashSet<>(fields.length);
+ for (Field field : fields) {
+ if (ConfigOption.class.isAssignableFrom(field.getType())
+ && field.getAnnotation(Documentation.Immutable.class) !=
null) {
+ try {
+ immutableKeys.add(((ConfigOption<?>)
field.get(CatalogOptions.class)).key());
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return immutableKeys;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
index 8a6bab4b1..c574eeb62 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
@@ -23,11 +23,17 @@ import org.apache.paimon.factories.Factory;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
import org.apache.paimon.utils.Preconditions;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import static
org.apache.paimon.catalog.CatalogOptionsManager.validateCatalogOptions;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -82,10 +88,33 @@ public interface CatalogFactory extends Factory {
} else {
fileIO.mkdirs(warehousePath);
}
+
+ // persist immutable catalog options
+ CatalogOptionsManager catalogOptionsManager =
+ new CatalogOptionsManager(fileIO, new Path(warehouse));
+ Map<String, String> immutableOptions =
+ immutableOptions(context,
CatalogOptions.getImmutableOptionKeys());
+ if (fileIO.exists(catalogOptionsManager.getCatalogOptionPath())) {
+ Map<String, String> originImmutableOptions =
+ catalogOptionsManager.immutableOptions();
+ validateCatalogOptions(
+ Options.fromMap(originImmutableOptions),
Options.fromMap(immutableOptions));
+ } else {
+ catalogOptionsManager.saveImmutableOptions(immutableOptions);
+ }
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return catalogFactory.create(fileIO, warehousePath, context);
}
+
+ static Map<String, String> immutableOptions(
+ CatalogContext context, Set<String> immutableCatalogOptionKeys) {
+ Map<String, String> immutableOptions = new HashMap<>();
+ context.options().keySet().stream()
+ .filter(key -> immutableCatalogOptionKeys.contains(key))
+ .forEach(key -> immutableOptions.put(key,
context.options().get(key)));
+ return immutableOptions;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogOptionsManager.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogOptionsManager.java
new file mode 100644
index 000000000..542d8b7b4
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogOptionsManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Map;
+
+import static org.apache.paimon.options.CatalogOptions.DATA_LINEAGE;
+import static org.apache.paimon.options.CatalogOptions.TABLE_LINEAGE;
+
+/** Manage read and write of immutable {@link
org.apache.paimon.options.CatalogOptions}. */
+public class CatalogOptionsManager {
+ private static final String OPTIONS = "options";
+ private final FileIO fileIO;
+ private final Path warehouse;
+
+ public CatalogOptionsManager(FileIO fileIO, Path warehouse) {
+ this.fileIO = fileIO;
+ this.warehouse = warehouse;
+ }
+
+ boolean saveImmutableOptions(Map<String, String> immutableOptions) throws
IOException {
+ Path catalogOptionPath = getCatalogOptionPath();
+ return fileIO.writeFileUtf8(catalogOptionPath,
JsonSerdeUtil.toJson(immutableOptions));
+ }
+
+ /** Read immutable catalog options. */
+ Map<String, String> immutableOptions() {
+ try {
+ return
JsonSerdeUtil.fromJson(fileIO.readFileUtf8(getCatalogOptionPath()), Map.class);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public Path getCatalogOptionPath() {
+ return new Path(warehouse + "/options/" + OPTIONS);
+ }
+
+ /**
+ * Validate the {@link org.apache.paimon.options.CatalogOptions}.
+ *
+ * @param originImmutableOptions the origin persisted immutable catalog
options
+ * @param newImmutableOptions the new immutable catalog options
+ */
+ public static void validateCatalogOptions(
+ Options originImmutableOptions, Options newImmutableOptions) {
+ // Only open data-lineage without open table-lineage is not supported,
data-lineage is
+ // depend on table-lineage.
+ if (newImmutableOptions.get(DATA_LINEAGE) &&
!newImmutableOptions.get(TABLE_LINEAGE)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Can not open %s without %s opened, please set
both of them to TRUE or remove %s.",
+ CatalogOptions.DATA_LINEAGE.key(),
+ CatalogOptions.TABLE_LINEAGE.key(),
+ CatalogOptions.DATA_LINEAGE.key()));
+ }
+
+ // check immutable catalog options
+ if (originImmutableOptions != null &&
!originImmutableOptions.equals(newImmutableOptions)) {
+ throw new IllegalStateException(
+ String.format(
+ "The immutable catalog options changed, origin
options are %s, new options are %s.",
+ originImmutableOptions.toMap(),
newImmutableOptions.toMap()));
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogOptionsManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogOptionsManagerTest.java
new file mode 100644
index 000000000..44891d2ec
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogOptionsManagerTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+
+import static org.apache.paimon.options.CatalogOptions.DATA_LINEAGE;
+import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
+import static org.apache.paimon.options.CatalogOptions.TABLE_LINEAGE;
+import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/** Tests for {@link CatalogOptionsManager}. */
+public class CatalogOptionsManagerTest {
+ @TempDir static java.nio.file.Path tempDir;
+ private static Options withImmutableOptions = new Options();
+ private static Options withoutImmutableOptions = new Options();
+ private static Options withPartOfImmutableOptions = new Options();
+
+ private static CatalogContext catalogContext;
+
+ private static FileIO fileIO;
+
+ @BeforeAll
+ public static void beforeAll() throws IOException {
+ withImmutableOptions.set(TABLE_LINEAGE, true);
+ withImmutableOptions.set(DATA_LINEAGE, true);
+ withImmutableOptions.set(WAREHOUSE, tempDir.toString());
+
+ withoutImmutableOptions.set(WAREHOUSE, tempDir.toString());
+
+ withPartOfImmutableOptions.set(TABLE_LINEAGE, true);
+ withPartOfImmutableOptions.set(WAREHOUSE, tempDir.toString());
+
+ catalogContext = CatalogContext.create(withoutImmutableOptions);
+ fileIO = FileIO.get(new Path(tempDir.toString()), catalogContext);
+ }
+
+ @AfterEach
+ public void afterEach() throws IOException {
+ cleanCatalog();
+ }
+
+ @Test
+ public void testSaveImmutableCatalogOptions() throws IOException {
+ String warehouse = tempDir.toString();
+
+ CatalogOptionsManager catalogOptionsManager =
+ new CatalogOptionsManager(fileIO, new Path(warehouse));
+
CatalogFactory.createCatalog(CatalogContext.create(withImmutableOptions));
+
assertThat(fileIO.exists(catalogOptionsManager.getCatalogOptionPath()));
+ assertThat(withImmutableOptions.toMap())
+
.containsAllEntriesOf(catalogOptionsManager.immutableOptions());
+ }
+
+ @Test
+ public void testCatalogOptionsValidate() {
+ Options newImmutableCatalogOptions = new Options();
+ assertDoesNotThrow(
+ () ->
+ CatalogOptionsManager.validateCatalogOptions(
+ null, newImmutableCatalogOptions));
+ newImmutableCatalogOptions.set(TABLE_LINEAGE, true);
+ assertDoesNotThrow(
+ () ->
+ CatalogOptionsManager.validateCatalogOptions(
+ null, newImmutableCatalogOptions));
+ newImmutableCatalogOptions.set(DATA_LINEAGE, true);
+ assertDoesNotThrow(
+ () ->
+ CatalogOptionsManager.validateCatalogOptions(
+ null, newImmutableCatalogOptions));
+ newImmutableCatalogOptions.set(TABLE_LINEAGE, false);
+ assertThatThrownBy(
+ () ->
+ CatalogOptionsManager.validateCatalogOptions(
+ null, newImmutableCatalogOptions))
+ .isInstanceOf(UnsupportedOperationException.class);
+ newImmutableCatalogOptions.set(TABLE_LINEAGE, true);
+
+ Options originImmutableCatalogOptions = new Options();
+ assertThatThrownBy(
+ () ->
+ CatalogOptionsManager.validateCatalogOptions(
+ originImmutableCatalogOptions,
newImmutableCatalogOptions))
+ .isInstanceOf(IllegalStateException.class);
+ originImmutableCatalogOptions.set(TABLE_LINEAGE, true);
+ assertThatThrownBy(
+ () ->
+ CatalogOptionsManager.validateCatalogOptions(
+ originImmutableCatalogOptions,
newImmutableCatalogOptions))
+ .isInstanceOf(IllegalStateException.class);
+ originImmutableCatalogOptions.set(DATA_LINEAGE, true);
+ assertDoesNotThrow(
+ () ->
+ CatalogOptionsManager.validateCatalogOptions(
+ originImmutableCatalogOptions,
newImmutableCatalogOptions));
+ }
+
+ @Test
+ public void testCreatingCatalogWithConflictOptions() throws IOException {
+ // session1: without immutable options, session2: with immutable
options, throw
+ // IllegalStateException
+
CatalogFactory.createCatalog(CatalogContext.create(withoutImmutableOptions));
+ assertThatThrownBy(
+ () ->
+ CatalogFactory.createCatalog(
+
CatalogContext.create(withImmutableOptions)))
+ .isInstanceOf(IllegalStateException.class);
+ cleanCatalog();
+
+ // session1: without immutable options, session2: without immutable
options, succeeded
+
CatalogFactory.createCatalog(CatalogContext.create(withoutImmutableOptions));
+ assertDoesNotThrow(
+ () ->
CatalogFactory.createCatalog(CatalogContext.create(withoutImmutableOptions)));
+ cleanCatalog();
+
+ // session1: with immutable options, session2: with the same immutable
options, succeeded
+
CatalogFactory.createCatalog(CatalogContext.create(withImmutableOptions));
+ assertDoesNotThrow(
+ () ->
CatalogFactory.createCatalog(CatalogContext.create(withImmutableOptions)));
+ cleanCatalog();
+
+ // session1: with immutable options, session2: with different
immutable options, throw
+ // IllegalStateException
+
CatalogFactory.createCatalog(CatalogContext.create(withImmutableOptions));
+ assertThatThrownBy(
+ () ->
+ CatalogFactory.createCatalog(
+
CatalogContext.create(withPartOfImmutableOptions)))
+ .isInstanceOf(IllegalStateException.class);
+ cleanCatalog();
+
+ // session1: with immutable options, session2: without immutable
options, throw
+ // IllegalStateException
+
CatalogFactory.createCatalog(CatalogContext.create(withImmutableOptions));
+ assertThatThrownBy(
+ () ->
+ CatalogFactory.createCatalog(
+
CatalogContext.create(withoutImmutableOptions)))
+ .isInstanceOf(IllegalStateException.class);
+ cleanCatalog();
+ }
+
+ @Test
+ public void testCatalogImmutableOptionKeys() {
+ assertThat(CatalogOptions.getImmutableOptionKeys())
+ .containsExactlyInAnyOrder(
+ LINEAGE_META.key(), TABLE_LINEAGE.key(),
DATA_LINEAGE.key());
+ }
+
+ private static void cleanCatalog() throws IOException {
+ fileIO.delete(new Path(tempDir.toString()), true);
+ }
+}