This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a6b2c6c0c [core] migrate CoeOptions to common module
a6b2c6c0c is described below

commit a6b2c6c0c9b6da3de8cda542dbbb46513fea3d76
Author: wgcn <[email protected]>
AuthorDate: Sat Jul 15 16:10:50 2023 +0800

    [core] migrate CoeOptions to common module
    
    This closes #1569
---
 .../main/java/org/apache/paimon/CoreOptions.java   | 34 +++---------------
 .../src/main/java/org/apache/paimon/WriteMode.java |  0
 .../java/org/apache/paimon/format/FileFormat.java  |  8 +++++
 .../apache/paimon/format/FileFormatDiscover.java   | 11 +-----
 .../paimon/table/AbstractFileStoreTable.java       | 40 +++++++++++++++++++++-
 .../configuration/ConfigOptionsDocGenerator.java   |  2 +-
 6 files changed, 53 insertions(+), 42 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
similarity index 97%
rename from paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
rename to paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 4d16f7554..2cd255558 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -22,7 +22,6 @@ import 
org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
 import org.apache.paimon.annotation.Documentation.Immutable;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.MemorySize;
@@ -30,9 +29,7 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.options.description.DescribedEnum;
 import org.apache.paimon.options.description.Description;
 import org.apache.paimon.options.description.InlineElement;
-import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
 import java.io.Serializable;
@@ -834,7 +831,7 @@ public class CoreOptions implements Serializable {
     public static FileFormat createFileFormat(
             Options options, ConfigOption<FileFormatType> formatOption) {
         String formatIdentifier = options.get(formatOption).toString();
-        return FileFormatDiscover.getFileFormat(options, formatIdentifier);
+        return FileFormat.getFileFormat(options, formatIdentifier);
     }
 
     public Map<Integer, String> fileCompressionPerLevel() {
@@ -1115,39 +1112,16 @@ public class CoreOptions implements Serializable {
         return defaultValues;
     }
 
-    public List<CommitCallback> commitCallbacks() {
-        List<CommitCallback> result = new ArrayList<>();
+    public Map<String, String> commitCallbacks() {
+        Map<String, String> result = new HashMap<>();
         for (String className : options.get(COMMIT_CALLBACKS).split(",")) {
             className = className.trim();
             if (className.length() == 0) {
                 continue;
             }
 
-            Class<?> clazz;
-            try {
-                clazz = Class.forName(className, true, 
this.getClass().getClassLoader());
-            } catch (ClassNotFoundException e) {
-                throw new RuntimeException(e);
-            }
-            Preconditions.checkArgument(
-                    CommitCallback.class.isAssignableFrom(clazz),
-                    "Class " + clazz + " must implement " + 
CommitCallback.class);
             String param = 
options.get(COMMIT_CALLBACK_PARAM.key().replace("#", className));
-
-            try {
-                if (param == null) {
-                    result.add((CommitCallback) clazz.newInstance());
-                } else {
-                    result.add(
-                            (CommitCallback) 
clazz.getConstructor(String.class).newInstance(param));
-                }
-            } catch (Exception e) {
-                throw new RuntimeException(
-                        "Failed to initialize commit callback "
-                                + className
-                                + (param == null ? "" : " with param " + 
param),
-                        e);
-            }
+            result.put(className, param);
         }
         return result;
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/WriteMode.java 
b/paimon-common/src/main/java/org/apache/paimon/WriteMode.java
similarity index 100%
rename from paimon-core/src/main/java/org/apache/paimon/WriteMode.java
rename to paimon-common/src/main/java/org/apache/paimon/WriteMode.java
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
index bf7bc006f..48a5dc678 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.format;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.format.FileFormatFactory.FormatContext;
 import org.apache.paimon.options.Options;
@@ -114,4 +115,11 @@ public abstract class FileFormat {
 
         return Optional.empty();
     }
+
+    public static FileFormat getFileFormat(Options options, String 
formatIdentifier) {
+        int readBatchSize = options.get(CoreOptions.READ_BATCH_SIZE);
+        return FileFormat.fromIdentifier(
+                formatIdentifier,
+                new FormatContext(options.removePrefix(formatIdentifier + 
"."), readBatchSize));
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java 
b/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java
index dd2a18e51..d0765efec 100644
--- a/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java
+++ b/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java
@@ -19,8 +19,6 @@
 package org.apache.paimon.format;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.format.FileFormatFactory.FormatContext;
-import org.apache.paimon.options.Options;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -38,17 +36,10 @@ public interface FileFormatDiscover {
             }
 
             private FileFormat create(String identifier) {
-                return getFileFormat(options.toConfiguration(), identifier);
+                return FileFormat.getFileFormat(options.toConfiguration(), 
identifier);
             }
         };
     }
 
     FileFormat discover(String identifier);
-
-    static FileFormat getFileFormat(Options options, String formatIdentifier) {
-        int readBatchSize = options.get(CoreOptions.READ_BATCH_SIZE);
-        return FileFormat.fromIdentifier(
-                formatIdentifier,
-                new FormatContext(options.removePrefix(formatIdentifier + 
"."), readBatchSize));
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 3dfeabc8b..35bb0049a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -53,6 +53,7 @@ import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
 import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
@@ -263,13 +264,50 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
     }
 
     private List<CommitCallback> createCommitCallbacks() {
-        List<CommitCallback> callbacks = new 
ArrayList<>(coreOptions().commitCallbacks());
+        List<CommitCallback> callbacks = new 
ArrayList<>(loadCommitCallbacks());
         if (coreOptions().partitionedTableInMetastore() && 
metastoreClientFactory != null) {
             callbacks.add(new 
AddPartitionCommitCallback(metastoreClientFactory.create()));
         }
         return callbacks;
     }
 
+    private List<CommitCallback> loadCommitCallbacks() {
+        List<CommitCallback> result = new ArrayList<>();
+
+        Map<String, String> clazzParamMaps = coreOptions().commitCallbacks();
+        for (Map.Entry<String, String> classParamEntry : 
clazzParamMaps.entrySet()) {
+            String className = classParamEntry.getKey();
+            String param = classParamEntry.getValue();
+
+            Class<?> clazz;
+            try {
+                clazz = Class.forName(className, true, 
this.getClass().getClassLoader());
+            } catch (ClassNotFoundException e) {
+                throw new RuntimeException(e);
+            }
+
+            Preconditions.checkArgument(
+                    CommitCallback.class.isAssignableFrom(clazz),
+                    "Class " + clazz + " must implement " + 
CommitCallback.class);
+
+            try {
+                if (param == null) {
+                    result.add((CommitCallback) clazz.newInstance());
+                } else {
+                    result.add(
+                            (CommitCallback) 
clazz.getConstructor(String.class).newInstance(param));
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(
+                        "Failed to initialize commit callback "
+                                + className
+                                + (param == null ? "" : " with param " + 
param),
+                        e);
+            }
+        }
+        return result;
+    }
+
     private Optional<TableSchema> tryTimeTravel(Options options) {
         CoreOptions coreOptions = new CoreOptions(options);
 
diff --git 
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
 
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
index f26f3f4af..b1c53b538 100644
--- 
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
+++ 
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
@@ -73,7 +73,7 @@ public class ConfigOptionsDocGenerator {
     static final OptionsClassLocation[] LOCATIONS =
             new OptionsClassLocation[] {
                 new OptionsClassLocation("paimon-common", 
"org.apache.paimon.options"),
-                new OptionsClassLocation("paimon-core", "org.apache.paimon"),
+                new OptionsClassLocation("paimon-common", "org.apache.paimon"),
                 new OptionsClassLocation(
                         "paimon-flink/paimon-flink-common", 
"org.apache.paimon.flink"),
                 new OptionsClassLocation(

Reply via email to