This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fd4cec3a9 [Feature][Connector V2] expose configurable options in Hudi
(#3383)
fd4cec3a9 is described below
commit fd4cec3a95b0c36dba640a451f402eae403a46ff
Author: Cason-ACE <[email protected]>
AuthorDate: Mon Dec 19 16:04:11 2022 +0800
[Feature][Connector V2] expose configurable options in Hudi (#3383)
* add expose configurable options in Hudi
* fix codeStyle
[Feature][Connector V2]expose configurable options in Hudi
* [Feature][Connector V2]expose configurable options in Hudi
fix use.kerberos type from string to boolean
fix use.kerberos default with false
* [Feature][Connector V2]expose configurable options in Hudi
fix optionRule;
Update Hudi Connector Doc About Config;
* [Feature][Connector V2]expose configurable options in Hudi
fix table.type Description
* [Feature][Connector V2]expose configurable options in Hudi
fix codeStyle;
Avoid Using .*;
* [Feature][Connector V2]expose configurable options in Hudi
Remove Option : KERBEROS_PRINCIPAL,KERBEROS_PRINCIPAL_FILE From
optional In optionRule
* [Feature][Connector V2]expose configurable options in Hudi
fix codeStyle
* [Feature][Connector V2]expose configurable options in Hudi
Add UT for Hudi option rule
* [Feature][Connector V2]expose
Resolve Conflicts and fix comment text
* [Feature][Connector V2] Hudi Fix CodeStyle
---
docs/en/connector-v2/source/Hudi.md | 18 ++++----
.../seatunnel/hudi/config/HudiSourceConfig.java | 50 ++++++++++++++++------
.../seatunnel/hudi/source/HudiSource.java | 20 ++++-----
.../seatunnel/hudi/source/HudiSourceFactory.java | 43 +++++++++++++++++++
.../seatunnel/hudi/HudiFactoryTest.java} | 21 ++++-----
5 files changed, 109 insertions(+), 43 deletions(-)
diff --git a/docs/en/connector-v2/source/Hudi.md
b/docs/en/connector-v2/source/Hudi.md
index 0880ca460..5d059cb90 100644
--- a/docs/en/connector-v2/source/Hudi.md
+++ b/docs/en/connector-v2/source/Hudi.md
@@ -22,15 +22,15 @@ Currently, only supports hudi cow table and Snapshot Query
with Batch Mode
## Options
-| name | type | required | default value |
-| ----------------------- | ------- | -------- | ------------- |
-| table.path | string | yes | - |
-| table.type | string | yes | - |
-| conf.files | string | yes | - |
-| use.kerberos | boolean | no | false |
-| kerberos.principal | string | no | - |
-| kerberos.principal.file | string | no | - |
-| common-options | | no | - |
+| name | type | required | default
value |
+| ----------------------- |---------|------------------------------|
------------- |
+| table.path | string | yes | -
|
+| table.type | string | yes | -
|
+| conf.files | string | yes | -
|
+| use.kerberos | boolean | no | false
|
+| kerberos.principal | string | yes when use.kerberos = true | -
|
+| kerberos.principal.file | string | yes when use.kerberos = true | -
|
+| common-options | config | no | -
|
### table.path [string]
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java
index 9f1b86e94..2f75a50ca 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java
@@ -17,18 +17,44 @@
package org.apache.seatunnel.connectors.seatunnel.hudi.config;
-public class HudiSourceConfig {
-
- public static final String TABLE_PATH = "table.path";
-
- public static final String TABLE_TYPE = "table.type";
-
- public static final String CONF_FILES = "conf.files";
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
- public static final String USE_KERBEROS = "use.kerberos";
-
- public static final String KERBEROS_PRINCIPAL = "kerberos.principal";
-
- public static final String KERBEROS_PRINCIPAL_FILE =
"kerberos.principal.file";
+public class HudiSourceConfig {
+ public static final Option<String> TABLE_PATH =
+ Options.key("table.path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("hudi table path");
+
+ public static final Option<String> TABLE_TYPE =
+ Options.key("table.type")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("hudi table type. default hudi table type
is cow. mor is not support yet");
+
+ public static final Option<String> CONF_FILES =
+ Options.key("conf.files")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("hudi conf files ");
+
+ public static final Option<Boolean> USE_KERBEROS =
+ Options.key("use.kerberos")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("hudi use.kerberos");
+
+ public static final Option<String> KERBEROS_PRINCIPAL =
+ Options.key("kerberos.principal")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("hudi kerberos.principal");
+
+ public static final Option<String> KERBEROS_PRINCIPAL_FILE =
+ Options.key("kerberos.principal.file")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("hudi kerberos.principal.file ");
}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
index 4b2f11e75..46edd20a6 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
@@ -65,36 +65,36 @@ public class HudiSource implements
SeaTunnelSource<SeaTunnelRow, HudiSourceSplit
@Override
public void prepare(Config pluginConfig) {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
TABLE_PATH, CONF_FILES);
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
TABLE_PATH.key(), CONF_FILES.key());
if (!result.isSuccess()) {
throw new
HudiConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg())
);
}
- // default hudi table tupe is cow
+ // default hudi table type is cow
// TODO: support hudi mor table
// TODO: support Incremental Query and Read Optimized Query
- if (!"cow".equalsIgnoreCase(pluginConfig.getString(TABLE_TYPE))) {
+ if (!"cow".equalsIgnoreCase(pluginConfig.getString(TABLE_TYPE.key())))
{
throw new
HudiConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, "Do not support hudi
mor table yet!")
);
}
try {
- this.confFiles = pluginConfig.getString(CONF_FILES);
- this.tablePath = pluginConfig.getString(TABLE_PATH);
- if (CheckConfigUtil.isValidParam(pluginConfig, USE_KERBEROS)) {
- this.useKerberos = pluginConfig.getBoolean(USE_KERBEROS);
+ this.confFiles = pluginConfig.getString(CONF_FILES.key());
+ this.tablePath = pluginConfig.getString(TABLE_PATH.key());
+ if (CheckConfigUtil.isValidParam(pluginConfig,
USE_KERBEROS.key())) {
+ this.useKerberos = pluginConfig.getBoolean(USE_KERBEROS.key());
if (this.useKerberos) {
- CheckResult kerberosCheckResult =
CheckConfigUtil.checkAllExists(pluginConfig, KERBEROS_PRINCIPAL,
KERBEROS_PRINCIPAL_FILE);
+ CheckResult kerberosCheckResult =
CheckConfigUtil.checkAllExists(pluginConfig, KERBEROS_PRINCIPAL.key(),
KERBEROS_PRINCIPAL_FILE.key());
if (!kerberosCheckResult.isSuccess()) {
throw new
HudiConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s,
Message: %s",
getPluginName(), PluginType.SOURCE,
result.getMsg())
);
}
-
HudiUtil.initKerberosAuthentication(HudiUtil.getConfiguration(this.confFiles),
pluginConfig.getString(KERBEROS_PRINCIPAL),
pluginConfig.getString(KERBEROS_PRINCIPAL_FILE));
+
HudiUtil.initKerberosAuthentication(HudiUtil.getConfiguration(this.confFiles),
pluginConfig.getString(KERBEROS_PRINCIPAL.key()),
pluginConfig.getString(KERBEROS_PRINCIPAL_FILE.key()));
}
}
this.filePath = HudiUtil.getParquetFileByPath(this.confFiles,
tablePath);
@@ -102,7 +102,7 @@ public class HudiSource implements
SeaTunnelSource<SeaTunnelRow, HudiSourceSplit
throw new
HudiConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
String.format("%s has no parquet file, please check!",
tablePath));
}
- // should read from config or read from hudi metadata( wait catlog
done)
+ // should read from config or read from hudi metadata( wait
catalog done)
this.typeInfo = HudiUtil.getSeaTunnelRowTypeInfo(this.confFiles,
this.filePath);
} catch (HudiConnectorException | IOException e) {
throw new
HudiConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceFactory.java
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceFactory.java
new file mode 100644
index 000000000..6a0508395
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSource.class)
+public class HudiSourceFactory implements TableSourceFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return "Hudi";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(HudiSourceConfig.TABLE_PATH,
HudiSourceConfig.TABLE_TYPE, HudiSourceConfig.CONF_FILES)
+ .optional(HudiSourceConfig.USE_KERBEROS)
+ .conditional(HudiSourceConfig.USE_KERBEROS, true,
HudiSourceConfig.KERBEROS_PRINCIPAL, HudiSourceConfig.KERBEROS_PRINCIPAL_FILE)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiFactoryTest.java
similarity index 62%
copy from
seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java
copy to
seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiFactoryTest.java
index 9f1b86e94..d9499aa86 100644
---
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiFactoryTest.java
@@ -15,20 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.hudi.config;
+package org.apache.seatunnel.connectors.seatunnel.hudi;
-public class HudiSourceConfig {
+import org.apache.seatunnel.connectors.seatunnel.hudi.source.HudiSourceFactory;
- public static final String TABLE_PATH = "table.path";
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
- public static final String TABLE_TYPE = "table.type";
-
- public static final String CONF_FILES = "conf.files";
-
- public static final String USE_KERBEROS = "use.kerberos";
-
- public static final String KERBEROS_PRINCIPAL = "kerberos.principal";
-
- public static final String KERBEROS_PRINCIPAL_FILE =
"kerberos.principal.file";
+class HudiFactoryTest {
+ @Test
+ void optionRule() {
+ Assertions.assertNotNull((new HudiSourceFactory()).optionRule());
+ }
}