SML0127 commented on code in PR #2263:
URL: https://github.com/apache/fluss/pull/2263#discussion_r2768930623


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java:
##########
@@ -132,6 +141,152 @@ void after() throws ExecutionException, 
InterruptedException {
         dropAcl(Resource.any(), OperationType.ANY);
     }
 
+    /** Tests Kerberos (GSSAPI) authorization. */
+    @Test
+    void testKerberosAuthorization() throws Exception {
+        // Initialize and start a MiniKDC
+        Properties kdcConf = MiniKdc.createConf();
+        FlussMiniKdc kdc = new FlussMiniKdc(kdcConf);
+        kdc.start();
+
+        // Prepare workspace for keytab and krb5.conf
+        Path tempDir = Files.createTempDirectory("fluss-gssapi-test-" + 
UUID.randomUUID());
+        File workDir = tempDir.toFile();
+        File keytab = new File(workDir, "fluss.keytab");
+        File krb5Conf = kdc.getKrb5Conf();
+
+        try {
+            // Create principal for server and client
+            // Format
+            // - server: service/hostname
+            // - client: username
+            kdc.createPrincipal(keytab, "fluss/127.0.0.1", "client");
+
+            // Customize krb5.conf to enforce TCP and correct realm settings 
if necessary.
+            if (krb5Conf.exists()) {
+                String krb5Content =
+                        "[libdefaults]\n"
+                                + "    default_realm = "
+                                + kdc.getRealm()
+                                + "\n"
+                                + "    udp_preference_limit = 1\n"
+                                + "    kdc_tcp_port = "
+                                + kdc.getPort()
+                                + "\n"
+                                + "\n"
+                                + "[realms]\n"
+                                + "    "
+                                + kdc.getRealm()
+                                + " = {\n"
+                                + "        kdc = 127.0.0.1:"
+                                + kdc.getPort()
+                                + "\n"
+                                + "        admin_server = 127.0.0.1:"
+                                + kdc.getPort()
+                                + "\n"
+                                + "    }\n";
+
+                File customKrb5Conf =
+                        new File(workDir, "krb5-custom-" + UUID.randomUUID() + 
".conf");
+                Files.write(customKrb5Conf.toPath(), krb5Content.getBytes());
+                System.setProperty("java.security.krb5.conf", 
customKrb5Conf.getAbsolutePath());
+            }
+
+            String realm = kdc.getRealm();
+            String serverPrincipal = String.format("fluss/127.0.0.1@%s", 
realm);
+            String clientPrincipal = String.format("client@%s", realm);
+
+            // Configure Fluss Cluster
+            Configuration serverConf = initConfig();
+            
serverConf.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), 
"CLIENT:sasl");
+            serverConf.setString(
+                    ConfigOptions.SERVER_SASL_ENABLED_MECHANISMS_CONFIG.key(), 
"GSSAPI");
+
+            String serverJaas =
+                    String.format(
+                            "com.sun.security.auth.module.Krb5LoginModule 
required "
+                                    + "useKeyTab=true storeKey=true 
useTicketCache=false "
+                                    + "keyTab=\"%s\" principal=\"%s\";",
+                            keytab.getAbsolutePath(), serverPrincipal);
+            serverConf.setString("security.sasl.gssapi.jaas.config", 
serverJaas);
+
+            // Grant Super User privileges to client principal
+            serverConf.set(
+                    ConfigOptions.SUPER_USERS,
+                    "User:"
+                            + clientPrincipal
+                            + ";User:"
+                            + clientPrincipal.toLowerCase()
+                            + ";User:client");
+            // Enable authorization to ensure permissions are actually checked.
+            serverConf.set(ConfigOptions.AUTHORIZER_ENABLED, true);
+
+            // Create Fluss cluster with the secure configuration
+            final FlussClusterExtension kerberosFluss =
+                    FlussClusterExtension.builder()
+                            .setNumOfTabletServers(3)
+                            // Use 127.0.0.1 listeners to match kerberos 
service principal hostname
+                            .setCoordinatorServerListeners(
+                                    "FLUSS://127.0.0.1:0, 
CLIENT://127.0.0.1:0")
+                            .setTabletServerListeners("FLUSS://127.0.0.1:0, 
CLIENT://127.0.0.1:0")
+                            .setClusterConf(serverConf)
+                            .build();
+            try {
+                // Start Fluss cluster
+                kerberosFluss.start();
+
+                Configuration clientConf = 
kerberosFluss.getClientConfig("CLIENT");
+                String bootstrapServers =
+                        String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+
+                TableEnvironment tEnv =
+                        
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+                String clientJaas =
+                        String.format(
+                                "com.sun.security.auth.module.Krb5LoginModule 
required "
+                                        + "useKeyTab=true storeKey=true 
useTicketCache=false "
+                                        + "keyTab=\"%s\" principal=\"%s\";",
+                                keytab.getAbsolutePath(), clientPrincipal);
+
+                // Create a Flink catalog configured with Kerberos credentials.
+                String createCatalogDDL =

Review Comment:
   Sure, I will add those cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to