This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e3001207c8 [Feature][Connector-V2][Iceberg] Support Iceberg Kerberos
(#7246)
e3001207c8 is described below
commit e3001207c887f73cd44648b4adb79173572bb4a9
Author: 卢宗柱 <[email protected]>
AuthorDate: Wed Aug 14 22:34:47 2024 +0800
[Feature][Connector-V2][Iceberg] Support Iceberg Kerberos (#7246)
---
.../seatunnel/api/kerberos/KerberosConfig.java | 43 ++++++++++++++
.../common/exception/CommonErrorCode.java | 3 +-
.../seatunnel/file/config/BaseSinkConfig.java | 22 +------
.../seatunnel/iceberg/IcebergCatalogLoader.java | 67 +++++++++++++++++++++-
.../iceberg/catalog/IcebergCatalogFactory.java | 7 ++-
.../seatunnel/iceberg/config/CommonConfig.java | 18 +++++-
.../seatunnel/iceberg/data/IcebergTypeMapper.java | 1 -
.../seatunnel/iceberg/sink/IcebergSinkFactory.java | 3 +
.../source/enumerator/AbstractSplitEnumerator.java | 2 +-
.../iceberg/source/reader/IcebergSourceReader.java | 2 +-
.../seatunnel/iceberg/utils/SchemaUtils.java | 11 ++--
.../iceberg/catalog/IcebergCatalogTest.java | 7 +++
12 files changed, 150 insertions(+), 36 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/kerberos/KerberosConfig.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/kerberos/KerberosConfig.java
new file mode 100644
index 0000000000..d501a3ea49
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/kerberos/KerberosConfig.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.kerberos;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class KerberosConfig {
+
+ public static final Option<String> KERBEROS_PRINCIPAL =
+ Options.key("kerberos_principal")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("When use kerberos, we should set
kerberos user principal");
+
+ public static final Option<String> KRB5_PATH =
+ Options.key("krb5_path")
+ .stringType()
+ .defaultValue("/etc/krb5.conf")
+ .withDescription(
+ "When use kerberos, we should set krb5 path file
path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf'");
+
+ public static final Option<String> KERBEROS_KEYTAB_PATH =
+ Options.key("kerberos_keytab_path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("When using kerberos, We should specify
the keytab path");
+}
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 79621c4216..99cb7353cf 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -49,11 +49,9 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
WRITE_SEATUNNEL_ROW_ERROR(
"COMMON-23",
"<connector> write SeaTunnelRow failed, the SeaTunnelRow value is
'<seaTunnelRow>'."),
-
SQL_TEMPLATE_HANDLED_ERROR(
"COMMON-24",
"The table of <tableName> has no <keyName>, but the template \n
<template> \n which has the place holder named <placeholder>. Please use the
option named <optionName> to specify sql template"),
-
VERSION_NOT_SUPPORTED("COMMON-25", "<identifier> <version> is
unsupported."),
OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is
unsupported."),
CONVERT_TO_SEATUNNEL_PROPS_BLANK_ERROR(
@@ -78,6 +76,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
"COMMON-33",
"The datetime format '<datetime>' of field '<field>' is not
supported. Please check the datetime format."),
UNSUPPORTED_METHOD("COMMON-34", "'<identifier>' unsupported the method
'<methodName>'"),
+ KERBEROS_AUTHORIZED_FAILED("COMMON-35", "Kerberos authorized failed"),
;
private final String code;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 0759baf9e4..35f1e4ba16 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.kerberos.KerberosConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
@@ -34,7 +35,7 @@ import static
org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
import static
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
-public class BaseSinkConfig {
+public class BaseSinkConfig extends KerberosConfig {
public static final String SEATUNNEL = "seatunnel";
public static final String NON_PARTITION = "NON_PARTITION";
public static final String TRANSACTION_ID_SPLIT = "_";
@@ -228,25 +229,6 @@ public class BaseSinkConfig {
.noDefaultValue()
.withDescription("The remote user name of hdfs");
- public static final Option<String> KRB5_PATH =
- Options.key("krb5_path")
- .stringType()
- .defaultValue("/etc/krb5.conf")
- .withDescription(
- "When use kerberos, we should set krb5 path file
path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf");
-
- public static final Option<String> KERBEROS_PRINCIPAL =
- Options.key("kerberos_principal")
- .stringType()
- .noDefaultValue()
- .withDescription("Kerberos principal");
-
- public static final Option<String> KERBEROS_KEYTAB_PATH =
- Options.key("kerberos_keytab_path")
- .stringType()
- .noDefaultValue()
- .withDescription("Kerberos keytab file path");
-
public static final Option<Integer> MAX_ROWS_IN_MEMORY =
Options.key("max_rows_in_memory")
.intType()
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
index 63596c88be..0f4610783a 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
@@ -19,14 +19,21 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg;
import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
+import
org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynMethods;
import lombok.extern.slf4j.Slf4j;
+import sun.security.krb5.KrbException;
import java.io.IOException;
import java.io.Serializable;
@@ -62,7 +69,7 @@ public class IcebergCatalogLoader implements Serializable {
* @param config
* @return
*/
- private Object loadHadoopConfig(CommonConfig config) {
+ public Object loadHadoopConfig(CommonConfig config) {
Class<?> configClass =
DynClasses.builder()
.impl("org.apache.hadoop.hdfs.HdfsConfiguration")
@@ -80,7 +87,6 @@ public class IcebergCatalogLoader implements Serializable {
log.info("Hadoop not found on classpath, not creating Hadoop
config");
return null;
}
-
try {
Object result = configClass.getDeclaredConstructor().newInstance();
DynMethods.BoundMethod addResourceMethod =
@@ -109,6 +115,8 @@ public class IcebergCatalogLoader implements Serializable {
});
}
config.getHadoopProps().forEach(setMethod::invoke);
+ // kerberos authentication
+ doKerberosLogin((Configuration) result);
log.info("Hadoop config initialized: {}", configClass.getName());
return result;
} catch (InstantiationException
@@ -121,4 +129,59 @@ public class IcebergCatalogLoader implements Serializable {
}
return null;
}
+
+ /**
+ * kerberos authentication
+ *
+ * @param configuration Configuration
+ */
+ private Configuration doKerberosLogin(Configuration configuration) {
+ String kerberosKrb5ConfPath = config.getKerberosKrb5ConfPath();
+ String kerberosKeytabPath = config.getKerberosKeytabPath();
+ String kerberosPrincipal = config.getKerberosPrincipal();
+
+ if (StringUtils.isNotEmpty(kerberosPrincipal)
+ && StringUtils.isNotEmpty(kerberosKrb5ConfPath)
+ && StringUtils.isNotEmpty(kerberosKeytabPath)) {
+ try {
+ System.setProperty("java.security.krb5.conf",
kerberosKrb5ConfPath);
+ System.setProperty("krb.principal", kerberosPrincipal);
+ doKerberosAuthentication(configuration, kerberosPrincipal,
kerberosKeytabPath);
+ } catch (Exception e) {
+ throw new IcebergConnectorException(
+ CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
+ String.format("Kerberos authentication failed: %s",
e.getMessage()));
+ }
+ } else {
+ log.warn(
+ "Kerberos authentication is not configured, it will skip
kerberos authentication");
+ }
+
+ return configuration;
+ }
+
+ public static void doKerberosAuthentication(
+ Configuration configuration, String principal, String keytabPath) {
+ if (StringUtils.isBlank(principal) || StringUtils.isBlank(keytabPath))
{
+ log.warn(
+ "Principal [{}] or keytabPath [{}] is empty, it will skip
kerberos authentication",
+ principal,
+ keytabPath);
+ } else {
+ configuration.set("hadoop.security.authentication", "kerberos");
+ UserGroupInformation.setConfiguration(configuration);
+ try {
+ log.info(
+ "Start Kerberos authentication using principal {} and
keytab {}",
+ principal,
+ keytabPath);
+ sun.security.krb5.Config.refresh();
+ UserGroupInformation.loginUserFromKeytab(principal,
keytabPath);
+ UserGroupInformation loginUser =
UserGroupInformation.getLoginUser();
+ log.info("Kerberos authentication successful,UGI {}",
loginUser);
+ } catch (IOException | KrbException e) {
+ throw new SeaTunnelException("check connectivity failed, " +
e.getMessage(), e);
+ }
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogFactory.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogFactory.java
index 1259699068..182286e75f 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogFactory.java
@@ -48,7 +48,12 @@ public class IcebergCatalogFactory implements CatalogFactory
{
CommonConfig.KEY_NAMESPACE,
CommonConfig.KEY_TABLE,
CommonConfig.CATALOG_PROPS)
- .optional(CommonConfig.HADOOP_PROPS, KEY_CASE_SENSITIVE)
+ .optional(
+ CommonConfig.HADOOP_PROPS,
+ CommonConfig.KERBEROS_PRINCIPAL,
+ CommonConfig.KERBEROS_KEYTAB_PATH,
+ CommonConfig.KRB5_PATH,
+ KEY_CASE_SENSITIVE)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java
index a7503e6e30..ce76f5a512 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.iceberg.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.kerberos.KerberosConfig;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
import lombok.Getter;
@@ -33,7 +34,7 @@ import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch
@Getter
@ToString
-public class CommonConfig implements Serializable {
+public class CommonConfig extends KerberosConfig implements Serializable {
private static final long serialVersionUID = 239821141534421580L;
public static final Option<String> KEY_CATALOG_NAME =
@@ -89,6 +90,12 @@ public class CommonConfig implements Serializable {
private Map<String, String> hadoopProps;
private String hadoopConfPath;
+ // kerberos
+
+ private String kerberosPrincipal;
+ private String kerberosKeytabPath;
+ private String kerberosKrb5ConfPath;
+
public CommonConfig(ReadonlyConfig pluginConfig) {
this.catalogName =
checkArgumentNotNull(pluginConfig.get(KEY_CATALOG_NAME));
this.namespace = pluginConfig.get(KEY_NAMESPACE);
@@ -99,6 +106,15 @@ public class CommonConfig implements Serializable {
if (pluginConfig.toConfig().hasPath(KEY_CASE_SENSITIVE.key())) {
this.caseSensitive = pluginConfig.get(KEY_CASE_SENSITIVE);
}
+ if (pluginConfig.getOptional(KERBEROS_PRINCIPAL).isPresent()) {
+ this.kerberosPrincipal =
pluginConfig.getOptional(KERBEROS_PRINCIPAL).get();
+ }
+ if (pluginConfig.getOptional(KRB5_PATH).isPresent()) {
+ this.kerberosKrb5ConfPath =
pluginConfig.getOptional(KRB5_PATH).get();
+ }
+ if (pluginConfig.getOptional(KERBEROS_KEYTAB_PATH).isPresent()) {
+ this.kerberosKeytabPath =
pluginConfig.getOptional(KERBEROS_KEYTAB_PATH).get();
+ }
validate();
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
index e1635919d6..4f3f57e415 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
@@ -37,7 +37,6 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class IcebergTypeMapper {
-
public static SeaTunnelDataType<?> mapping(String field, @NonNull Type
icebergType) {
switch (icebergType.typeId()) {
case BOOLEAN:
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
index 212bb6371d..47cfa33108 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
@@ -52,6 +52,9 @@ public class IcebergSinkFactory implements TableSinkFactory {
.optional(
SinkConfig.TABLE_PROPS,
SinkConfig.HADOOP_PROPS,
+ SinkConfig.KERBEROS_PRINCIPAL,
+ SinkConfig.KERBEROS_KEYTAB_PATH,
+ SinkConfig.KRB5_PATH,
SinkConfig.WRITE_PROPS,
SinkConfig.AUTO_CREATE_PROPS,
SinkConfig.TABLE_PRIMARY_KEYS,
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
index ef3beacd26..73cb71f45f 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
@@ -42,7 +42,7 @@ import java.util.Set;
public abstract class AbstractSplitEnumerator
implements SourceSplitEnumerator<IcebergFileScanTaskSplit,
IcebergSplitEnumeratorState> {
- protected final SourceSplitEnumerator.Context<IcebergFileScanTaskSplit>
context;
+ protected final Context<IcebergFileScanTaskSplit> context;
protected final SourceConfig sourceConfig;
protected final Map<Integer, List<IcebergFileScanTaskSplit>> pendingSplits;
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
index ebc75bae9b..83f42879d0 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
@@ -46,7 +46,7 @@ public class IcebergSourceReader implements
SourceReader<SeaTunnelRow, IcebergFi
private static final long POLL_WAIT_MS = 1000;
- private final SourceReader.Context context;
+ private final Context context;
private final Queue<IcebergFileScanTaskSplit> pendingSplits;
private final Deserializer deserializer;
private final Schema tableSchema;
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
index 9d56072c92..6c99eb409c 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
@@ -274,7 +274,7 @@ public class SchemaUtils {
log.info("Schema for table {} updated with new columns", table.name());
}
- private static boolean columnExists(org.apache.iceberg.Schema schema,
SchemaAddColumn update) {
+ private static boolean columnExists(Schema schema, SchemaAddColumn update)
{
Types.StructType struct =
update.parentName() == null
? schema.asStruct()
@@ -282,13 +282,11 @@ public class SchemaUtils {
return struct.field(update.name()) != null;
}
- private static boolean typeMatches(
- org.apache.iceberg.Schema schema, SchemaModifyColumn update) {
+ private static boolean typeMatches(Schema schema, SchemaModifyColumn
update) {
return schema.findType(update.name()).typeId() ==
update.type().typeId();
}
- private static boolean findColumns(
- org.apache.iceberg.Schema schema, SchemaDeleteColumn deleteColumn)
{
+ private static boolean findColumns(Schema schema, SchemaDeleteColumn
deleteColumn) {
return schema.findField(deleteColumn.name()) != null;
}
@@ -300,8 +298,7 @@ public class SchemaUtils {
return IcebergTypeMapper.toIcebergType(rowType);
}
- public static PartitionSpec createPartitionSpec(
- org.apache.iceberg.Schema schema, List<String> partitionBy) {
+ public static PartitionSpec createPartitionSpec(Schema schema,
List<String> partitionBy) {
if (partitionBy.isEmpty()) {
return PartitionSpec.unpartitioned();
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
index 777f5a38e9..54aa3326de 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
@@ -75,6 +75,13 @@ class IcebergCatalogTest {
configs.put(CommonConfig.KEY_CATALOG_NAME.key(), CATALOG_NAME);
configs.put(CommonConfig.CATALOG_PROPS.key(), catalogProps);
configs.put(SinkConfig.TABLE_DEFAULT_PARTITION_KEYS.key(), "dt_col");
+ // hadoop config directory
+ configs.put(CommonConfig.HADOOP_CONF_PATH_PROP.key(),
"/tmp/hadoop/conf");
+ // hadoop kerberos config
+ configs.put(CommonConfig.KERBEROS_PRINCIPAL.key(),
"hive/[email protected]");
+ configs.put(
+ CommonConfig.KERBEROS_KEYTAB_PATH.key(),
"/tmp/hadoop/conf/hive.service.keytab");
+ configs.put(CommonConfig.KRB5_PATH.key(),
"/tmp/hadoop/conf/krb5.conf");
icebergCatalog = new IcebergCatalog(CATALOG_NAME,
ReadonlyConfig.fromMap(configs));
icebergCatalog.open();
}