liucongjy commented on code in PR #9722:
URL: https://github.com/apache/seatunnel/pull/9722#discussion_r2285031707
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonWithS3IT.java:
##########
@@ -139,4 +167,99 @@ public void
testFakeCDCSinkPaimonWithCheckpointInBatchModeWithS3Filesystem() thr
Container.ExecResult readResult =
executeJob("/fake_2_paimon_with_s3_to_assert.conf");
Assertions.assertEquals(0, readResult.getExitCode());
}
+
+ private void initPrivilege() {
+ org.apache.paimon.options.Options catalogOptions = new
org.apache.paimon.options.Options();
+ catalogOptions.set(PaimonBaseOptions.WAREHOUSE.key(), warehouse);
+ catalogOptions.set("fs.s3a.endpoint", container.getS3URL());
+ catalogOptions.set("fs.s3a.access-key", MINIO_USER_NAME);
+ catalogOptions.set("fs.s3a.secret-key", MINIO_USER_PASSWORD);
+ catalogOptions.set("fs.s3a.buffer.dir", "/tmp/s3abuffer");
+ catalogOptions.set("fs.s3a.change.detection.mode", "NONE");
+ catalogOptions.set("fs.s3a.change.detection.version.required",
"false");
+ catalogOptions.set("fs.s3a.path.style.access", "true");
+ catalogOptions.set(
+ "fs.s3a.aws.credentials.provider",
+ "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
+ final CatalogContext catalogContext =
CatalogContext.create(catalogOptions);
+
+ FileIO fileIO = new ResolvingFileIO();
+ fileIO.configure(catalogContext);
+
+ privilegedCatalog =
+ new PrivilegedCatalog(
+ CatalogFactory.createCatalog(catalogContext),
+ new FileBasedPrivilegeManagerLoader(
+ warehouse, fileIO, rootUser, rootPassword));
+ if (!privilegedCatalog.privilegeManager().privilegeEnabled()) {
+
privilegedCatalog.privilegeManager().initializePrivilege(rootPassword);
+ }
+
+ // create user and grant privilege on table
+ privilegedCatalog.privilegeManager().createUser(paimonUser,
paimonUserPassword);
+ String fullTableName = Identifier.create(DATABASE_NAME,
TABLE_NAME).getFullName();
+ privilegedCatalog.privilegeManager().grant(paimonUser, "",
PrivilegeType.CREATE_DATABASE);
+ privilegedCatalog
+ .privilegeManager()
+ .grant(paimonUser, DATABASE_NAME, PrivilegeType.DROP_DATABASE);
+ privilegedCatalog
+ .privilegeManager()
+ .grant(paimonUser, fullTableName, PrivilegeType.DROP_TABLE);
+ privilegedCatalog
+ .privilegeManager()
+ .grant(paimonUser, DATABASE_NAME, PrivilegeType.CREATE_TABLE);
+ }
+
+ private void grantPrivilege(List<PrivilegeType> privilegeTypes) {
+ String fullTableName = Identifier.create(DATABASE_NAME,
TABLE_NAME).getFullName();
+ if (!CollectionUtils.isEmpty(privilegeTypes)) {
+ for (PrivilegeType type : privilegeTypes) {
+ privilegedCatalog.privilegeManager().grant(paimonUser,
fullTableName, type);
+ }
+ }
+ }
+
+ private void revokePrivilege(List<PrivilegeType> privilegeTypes) {
+ String fullTableName = Identifier.create(DATABASE_NAME,
TABLE_NAME).getFullName();
+ if (!CollectionUtils.isEmpty(privilegeTypes)) {
+ for (PrivilegeType type : privilegeTypes) {
+ privilegedCatalog.privilegeManager().revoke(paimonUser,
fullTableName, type);
+ }
+ }
+ }
+
+ /** User not grant read privilege read data test cases for the Paimon
table. */
+ @Test
+ public void privilegeEnabledPaimonSourceAuthorized() throws Exception {
Review Comment:
After these two unit test methods complete execution, the granted
permissions are revoked, then reauthorized, and they are separate from the
original bucket.
##########
seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonPrivilegeCatalogTest.java:
##########
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.event.EventListener;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonBaseOptions;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket.PaimonBucketAssignerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.ResolvingFileIO;
+import org.apache.paimon.privilege.FileBasedPrivilegeManagerLoader;
+import org.apache.paimon.privilege.NoPrivilegeException;
+import org.apache.paimon.privilege.PrivilegeType;
+import org.apache.paimon.privilege.PrivilegedCatalog;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+@Slf4j
+public class PaimonPrivilegeCatalogTest {
+
+ private static PaimonCatalog authorizedCatalog;
+ private static PaimonCatalog unAuthorizedCatalog;
+ private static PaimonCatalog rootUserPaimonCatalog;
+ private static String CATALOG_NAME = "paimon_catalog";
+ private static String DATABASE_NAME = "test_db";
+ private static String TABLE_NAME = "test_table";
+
+ private PaimonSinkWriter paimonSinkWriter;
+ private static SinkWriter.Context context;
+ private final String commitUser = UUID.randomUUID().toString();
+ private static CatalogTable catalogTable;
+ @TempDir protected static java.nio.file.Path temporaryFolder;
+ private static String warehouse;
+ private static String rootUser = "root";
+ private static String rootPassword = "123456";
+ private static String bucketKey = "f0";
+ private static String authorizeUser = "paimon";
+ private static String authorizeUserPassword = "123456";
+ private static String unAuthorizeUser = "unauthorized_paimon";
+ private static String unAuthorizeUserPassword = "123456";
+
+ @BeforeAll
+ public static void before() {
+ warehouse = new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
+ initPrivilege();
+ rootUserPaimonCatalog = createPaimonCatalog(rootUser, rootPassword);
+ authorizedCatalog = createPaimonCatalog(authorizeUser,
authorizeUserPassword);
+ unAuthorizedCatalog = createPaimonCatalog(unAuthorizeUser,
unAuthorizeUserPassword);
+
+ createUser(authorizeUser, authorizeUserPassword);
+ grantPrivilege(
+ authorizeUser,
+ new PrivilegeType[] {
+ PrivilegeType.CREATE_TABLE,
+ PrivilegeType.ALTER_TABLE,
+ PrivilegeType.SELECT,
+ PrivilegeType.INSERT
+ });
+ createUser(unAuthorizeUser, unAuthorizeUserPassword);
+
+ createDatabase();
+ catalogTable = buildTable(TABLE_NAME);
+
+ TablePath tablePath = TablePath.of(DATABASE_NAME, TABLE_NAME);
+ rootUserPaimonCatalog.createTable(tablePath, catalogTable, false);
+
+ context =
+ new SinkWriter.Context() {
+ @Override
+ public int getIndexOfSubtask() {
+ return 0;
+ }
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ return null;
+ }
+
+ @Override
+ public EventListener getEventListener() {
+ return null;
+ }
+ };
+ }
+
+ private static CatalogTable buildTable(String tableName) {
+ TableSchema.Builder schemaBuilder = TableSchema.builder();
+ for (int i = 0; i < 5; i++) {
+ schemaBuilder.column(
+ PhysicalColumn.of(
+ "f" + i,
+ BasicType.STRING_TYPE,
+ (Long) null,
+ false,
+ null,
+ String.format("f%s col", i)));
+ }
+
+ TableSchema tableSchema =
+ schemaBuilder.primaryKey(PrimaryKey.of("pk",
Arrays.asList("f0"))).build();
+
+ CatalogTable cTable =
+ CatalogTable.of(
+ TableIdentifier.of(CATALOG_NAME, DATABASE_NAME,
tableName),
+ tableSchema,
+ new HashMap<>(),
+ new ArrayList<>(),
+ "test table");
+ return cTable;
+ }
+
+ private static void initPrivilege() {
+ org.apache.paimon.options.Options catalogOptions = new
org.apache.paimon.options.Options();
+ catalogOptions.set(PaimonBaseOptions.WAREHOUSE.key(), warehouse);
+ CatalogContext catalogContext = CatalogContext.create(catalogOptions);
+ FileIO fileIO = new ResolvingFileIO();
+ fileIO.configure(catalogContext);
+
+ PrivilegedCatalog priCatalog =
+ new PrivilegedCatalog(
+ CatalogFactory.createCatalog(catalogContext),
+ new FileBasedPrivilegeManagerLoader(
+ warehouse, fileIO, rootUser, rootPassword));
+ if (!priCatalog.privilegeManager().privilegeEnabled()) {
+ priCatalog.privilegeManager().initializePrivilege(rootPassword);
+ }
+ }
+
+ private static void createUser(String user, String password) {
+ Optional<Object> catalog =
ReflectionUtils.getField(rootUserPaimonCatalog, "catalog");
+ assertTrue(catalog.isPresent() && catalog.get() instanceof
PrivilegedCatalog);
+ PrivilegedCatalog priCatalog = (PrivilegedCatalog) catalog.get();
+ priCatalog.privilegeManager().createUser(user, password);
+ }
+
+ private static void grantPrivilege(String user, PrivilegeType[]
privilegeTypes) {
+ Optional<Object> catalog =
ReflectionUtils.getField(rootUserPaimonCatalog, "catalog");
+ assertTrue(catalog.isPresent() && catalog.get() instanceof
PrivilegedCatalog);
+ PrivilegedCatalog priCatalog = (PrivilegedCatalog) catalog.get();
+ String fullTableName = Identifier.create(DATABASE_NAME,
TABLE_NAME).getFullName();
+ for (PrivilegeType type : privilegeTypes) {
+ if (type == PrivilegeType.CREATE_TABLE) {
+ priCatalog
+ .privilegeManager()
+ .grant(user, DATABASE_NAME,
PrivilegeType.CREATE_TABLE);
+ } else {
+ priCatalog.privilegeManager().grant(user, fullTableName, type);
+ }
+ }
+ }
+
+ private static void createDatabase() {
+ try {
+ TablePath tablePath = TablePath.of(DATABASE_NAME, TABLE_NAME);
+ rootUserPaimonCatalog.createDatabase(tablePath, false);
+ } catch (DatabaseAlreadyExistException e) {
+ log.info("database already exist");
+ }
+ }
+
+ private static Map<String, Object> getPaimonProperties() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("warehouse", warehouse);
+ properties.put("plugin_name", "Paimon");
+ properties.put("database", DATABASE_NAME);
+ properties.put("table", TABLE_NAME);
+ Map<String, String> writeProps = new HashMap<>();
+ writeProps.put("bucket", "2");
+ writeProps.put("bucket-key", bucketKey);
+ properties.put("paimon.table.write-props", writeProps);
+ return properties;
+ }
+
+ private static PaimonCatalog createPaimonCatalog(String user, String
password) {
+ Map<String, Object> properties = getPaimonProperties();
+ if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password)) {
+ properties.put("user", user);
+ properties.put("password", password);
+ }
+ PaimonCatalog pCatalog =
+ new PaimonCatalog(CATALOG_NAME,
ReadonlyConfig.fromMap(properties));
+ pCatalog.open();
+ return pCatalog;
+ }
+
+ @Test
+ public void createCatalogWithNotUserAndPassword() {
+ assertThrows(
+ PaimonConnectorException.class,
+ () -> {
+ try {
+ createPaimonCatalog(null, null);
+ } catch (PaimonConnectorException e) {
+ assertTrue(
+ e.getMessage()
+ .contains(
+ "paimon privilege is enabled,
user and password is required"));
+ throw e;
+ }
+ });
+ }
+
+ @Test
+ public void createCatalogWithErrorPassword() {
+ PaimonCatalog catalog = createPaimonCatalog(authorizeUser,
"errorpassword");
+ assertThrows(
+ CatalogException.class,
+ () -> {
+ TablePath tablePath = TablePath.of(DATABASE_NAME,
TABLE_NAME);
+ try {
+ catalog.createTable(tablePath, catalogTable, false);
+ } catch (CatalogException e) {
+ assertTrue(
+ e.getCause()
+ .getMessage()
+ .contains(
+ String.format(
+ "User %s not found, or
password incorrect.",
+ authorizeUser)));
+ throw e;
+ }
+ });
+ }
+
+ @Test
+ public void testCreateTable() {
+ TablePath tablePath = TablePath.of(DATABASE_NAME,
"privilege_test_table");
+ CatalogTable catalogTable = buildTable("privilege_test_table");
+ // The permission to create tables
+ authorizedCatalog.createTable(tablePath, catalogTable, false);
+
+ // No permission to create tables
+ assertThrows(
+ CatalogException.class,
+ () -> {
+ try {
+ unAuthorizedCatalog.createTable(tablePath,
catalogTable, false);
+ } catch (CatalogException e) {
+ assertTrue(
+ e.getCause()
+ .getMessage()
+ .contains(
+ String.format(
+ "User %s doesn't have
privilege CREATE_TABLE on",
+ unAuthorizeUser)));
+ throw e;
+ }
+ });
+ }
+
+ @Test
+ public void testWriteTable() {
Review Comment:
Unit tests have been added for these two scenarios in this class
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]