This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a6b2c6c0c [core] migrate CoeOptions to common module
a6b2c6c0c is described below
commit a6b2c6c0c9b6da3de8cda542dbbb46513fea3d76
Author: wgcn <[email protected]>
AuthorDate: Sat Jul 15 16:10:50 2023 +0800
[core] migrate CoeOptions to common module
This closes #1569
---
.../main/java/org/apache/paimon/CoreOptions.java | 34 +++---------------
.../src/main/java/org/apache/paimon/WriteMode.java | 0
.../java/org/apache/paimon/format/FileFormat.java | 8 +++++
.../apache/paimon/format/FileFormatDiscover.java | 11 +-----
.../paimon/table/AbstractFileStoreTable.java | 40 +++++++++++++++++++++-
.../configuration/ConfigOptionsDocGenerator.java | 2 +-
6 files changed, 53 insertions(+), 42 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
similarity index 97%
rename from paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
rename to paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 4d16f7554..2cd255558 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -22,7 +22,6 @@ import
org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
import org.apache.paimon.annotation.Documentation.Immutable;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.MemorySize;
@@ -30,9 +29,7 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.options.description.DescribedEnum;
import org.apache.paimon.options.description.Description;
import org.apache.paimon.options.description.InlineElement;
-import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import java.io.Serializable;
@@ -834,7 +831,7 @@ public class CoreOptions implements Serializable {
public static FileFormat createFileFormat(
Options options, ConfigOption<FileFormatType> formatOption) {
String formatIdentifier = options.get(formatOption).toString();
- return FileFormatDiscover.getFileFormat(options, formatIdentifier);
+ return FileFormat.getFileFormat(options, formatIdentifier);
}
public Map<Integer, String> fileCompressionPerLevel() {
@@ -1115,39 +1112,16 @@ public class CoreOptions implements Serializable {
return defaultValues;
}
- public List<CommitCallback> commitCallbacks() {
- List<CommitCallback> result = new ArrayList<>();
+ public Map<String, String> commitCallbacks() {
+ Map<String, String> result = new HashMap<>();
for (String className : options.get(COMMIT_CALLBACKS).split(",")) {
className = className.trim();
if (className.length() == 0) {
continue;
}
- Class<?> clazz;
- try {
- clazz = Class.forName(className, true,
this.getClass().getClassLoader());
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- Preconditions.checkArgument(
- CommitCallback.class.isAssignableFrom(clazz),
- "Class " + clazz + " must implement " +
CommitCallback.class);
String param =
options.get(COMMIT_CALLBACK_PARAM.key().replace("#", className));
-
- try {
- if (param == null) {
- result.add((CommitCallback) clazz.newInstance());
- } else {
- result.add(
- (CommitCallback)
clazz.getConstructor(String.class).newInstance(param));
- }
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to initialize commit callback "
- + className
- + (param == null ? "" : " with param " +
param),
- e);
- }
+ result.put(className, param);
}
return result;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/WriteMode.java
b/paimon-common/src/main/java/org/apache/paimon/WriteMode.java
similarity index 100%
rename from paimon-core/src/main/java/org/apache/paimon/WriteMode.java
rename to paimon-common/src/main/java/org/apache/paimon/WriteMode.java
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
index bf7bc006f..48a5dc678 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
@@ -18,6 +18,7 @@
package org.apache.paimon.format;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.options.Options;
@@ -114,4 +115,11 @@ public abstract class FileFormat {
return Optional.empty();
}
+
+ public static FileFormat getFileFormat(Options options, String
formatIdentifier) {
+ int readBatchSize = options.get(CoreOptions.READ_BATCH_SIZE);
+ return FileFormat.fromIdentifier(
+ formatIdentifier,
+ new FormatContext(options.removePrefix(formatIdentifier +
"."), readBatchSize));
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java
b/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java
index dd2a18e51..d0765efec 100644
--- a/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java
+++ b/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java
@@ -19,8 +19,6 @@
package org.apache.paimon.format;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.format.FileFormatFactory.FormatContext;
-import org.apache.paimon.options.Options;
import java.util.HashMap;
import java.util.Map;
@@ -38,17 +36,10 @@ public interface FileFormatDiscover {
}
private FileFormat create(String identifier) {
- return getFileFormat(options.toConfiguration(), identifier);
+ return FileFormat.getFileFormat(options.toConfiguration(),
identifier);
}
};
}
FileFormat discover(String identifier);
-
- static FileFormat getFileFormat(Options options, String formatIdentifier) {
- int readBatchSize = options.get(CoreOptions.READ_BATCH_SIZE);
- return FileFormat.fromIdentifier(
- formatIdentifier,
- new FormatContext(options.removePrefix(formatIdentifier +
"."), readBatchSize));
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 3dfeabc8b..35bb0049a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -53,6 +53,7 @@ import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -263,13 +264,50 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
}
private List<CommitCallback> createCommitCallbacks() {
- List<CommitCallback> callbacks = new
ArrayList<>(coreOptions().commitCallbacks());
+ List<CommitCallback> callbacks = new
ArrayList<>(loadCommitCallbacks());
if (coreOptions().partitionedTableInMetastore() &&
metastoreClientFactory != null) {
callbacks.add(new
AddPartitionCommitCallback(metastoreClientFactory.create()));
}
return callbacks;
}
+ private List<CommitCallback> loadCommitCallbacks() {
+ List<CommitCallback> result = new ArrayList<>();
+
+ Map<String, String> clazzParamMaps = coreOptions().commitCallbacks();
+ for (Map.Entry<String, String> classParamEntry :
clazzParamMaps.entrySet()) {
+ String className = classParamEntry.getKey();
+ String param = classParamEntry.getValue();
+
+ Class<?> clazz;
+ try {
+ clazz = Class.forName(className, true,
this.getClass().getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ Preconditions.checkArgument(
+ CommitCallback.class.isAssignableFrom(clazz),
+ "Class " + clazz + " must implement " +
CommitCallback.class);
+
+ try {
+ if (param == null) {
+ result.add((CommitCallback) clazz.newInstance());
+ } else {
+ result.add(
+ (CommitCallback)
clazz.getConstructor(String.class).newInstance(param));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to initialize commit callback "
+ + className
+ + (param == null ? "" : " with param " +
param),
+ e);
+ }
+ }
+ return result;
+ }
+
private Optional<TableSchema> tryTimeTravel(Options options) {
CoreOptions coreOptions = new CoreOptions(options);
diff --git
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
index f26f3f4af..b1c53b538 100644
---
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
+++
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
@@ -73,7 +73,7 @@ public class ConfigOptionsDocGenerator {
static final OptionsClassLocation[] LOCATIONS =
new OptionsClassLocation[] {
new OptionsClassLocation("paimon-common",
"org.apache.paimon.options"),
- new OptionsClassLocation("paimon-core", "org.apache.paimon"),
+ new OptionsClassLocation("paimon-common", "org.apache.paimon"),
new OptionsClassLocation(
"paimon-flink/paimon-flink-common",
"org.apache.paimon.flink"),
new OptionsClassLocation(