leekeiabstraction commented on code in PR #1941:
URL: https://github.com/apache/fluss/pull/1941#discussion_r2722558220


##########
fluss-filesystems/fluss-fs-abfs/src/main/java/org/apache/fluss/fs/abfs/token/AzureDelegationTokenReceiver.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.abfs.token;
+
+import org.apache.fluss.fs.token.Credentials;
+import org.apache.fluss.fs.token.CredentialsJsonSerde;
+import org.apache.fluss.fs.token.ObtainedSecurityToken;
+import org.apache.fluss.fs.token.SecurityTokenReceiver;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/** Security token receiver for the abfs filesystem. */
+public class AzureDelegationTokenReceiver implements SecurityTokenReceiver {
+    public static final String PROVIDER_CONFIG_NAME = 
"fs.azure.account.oauth.provider.type";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AzureDelegationTokenReceiver.class);
+
+    static volatile Credentials credentials;
+    static volatile Map<String, String> additionInfos;
+
+    public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration 
hadoopConfig) {
+        LOG.info("Updating Hadoop configuration");
+
+        String providers = hadoopConfig.get(PROVIDER_CONFIG_NAME, "");
+
+        if 
(!providers.contains(DynamicTemporaryAzureCredentialsProvider.NAME)) {
+            if (providers.isEmpty()) {
+                LOG.debug("Setting provider");
+                providers = DynamicTemporaryAzureCredentialsProvider.NAME;
+            } else {
+                providers = DynamicTemporaryAzureCredentialsProvider.NAME + 
"," + providers;
+                LOG.debug("Prepending provider, new providers value: {}", 
providers);
+            }
+            hadoopConfig.set(PROVIDER_CONFIG_NAME, providers);
+        } else {
+            LOG.debug("Provider already exists");
+        }
+
+        // then, set addition info
+        if (additionInfos == null) {
+            // if addition info is null, it also means we have not received 
any token,
+            // we throw InvalidCredentialsException
+            throw new 
IllegalStateException(DynamicTemporaryAzureCredentialsProvider.COMPONENT);
+        } else {
+            for (Map.Entry<String, String> entry : additionInfos.entrySet()) {
+                hadoopConfig.set(entry.getKey(), entry.getValue());
+            }
+        }
+
+        LOG.info("Updated Hadoop configuration successfully");
+    }
+
+    @Override
+    public String scheme() {
+        return "abfs";
+    }
+
+    @Override
+    public void onNewTokensObtained(ObtainedSecurityToken token) {
+        LOG.info("Updating session credentials");
+
+        byte[] tokenBytes = token.getToken();
+
+        credentials = CredentialsJsonSerde.fromJson(tokenBytes);
+        additionInfos = token.getAdditionInfos();
+
+        LOG.info(
+                "Session credentials updated successfully with access key: 
{}.",
+                credentials.getAccessKeyId());

Review Comment:
   This will always show up as null as AzureDelegationTokenProvider 
instantiates Credentials object with null `accessKeyId`, can we provide a more 
useful log message?



##########
fluss-filesystems/fluss-fs-abfs/src/main/java/org/apache/fluss/fs/abfs/token/AzureDelegationTokenReceiver.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.abfs.token;
+
+import org.apache.fluss.fs.token.Credentials;
+import org.apache.fluss.fs.token.CredentialsJsonSerde;
+import org.apache.fluss.fs.token.ObtainedSecurityToken;
+import org.apache.fluss.fs.token.SecurityTokenReceiver;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/** Security token receiver for the abfs filesystem. */
+public class AzureDelegationTokenReceiver implements SecurityTokenReceiver {
+    public static final String PROVIDER_CONFIG_NAME = 
"fs.azure.account.oauth.provider.type";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AzureDelegationTokenReceiver.class);
+
+    static volatile Credentials credentials;
+    static volatile Map<String, String> additionInfos;
+
+    public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration 
hadoopConfig) {
+        LOG.info("Updating Hadoop configuration");
+
+        String providers = hadoopConfig.get(PROVIDER_CONFIG_NAME, "");
+
+        if 
(!providers.contains(DynamicTemporaryAzureCredentialsProvider.NAME)) {
+            if (providers.isEmpty()) {
+                LOG.debug("Setting provider");
+                providers = DynamicTemporaryAzureCredentialsProvider.NAME;
+            } else {
+                providers = DynamicTemporaryAzureCredentialsProvider.NAME + 
"," + providers;
+                LOG.debug("Prepending provider, new providers value: {}", 
providers);
+            }
+            hadoopConfig.set(PROVIDER_CONFIG_NAME, providers);
+        } else {
+            LOG.debug("Provider already exists");
+        }
+
+        // then, set addition info
+        if (additionInfos == null) {
+            // if addition info is null, it also means we have not received 
any token,
+            // we throw InvalidCredentialsException
+            throw new 
IllegalStateException(DynamicTemporaryAzureCredentialsProvider.COMPONENT);

Review Comment:
   Name of exception type doesn't match between actual and comment



##########
fluss-filesystems/fluss-fs-abfs/src/test/java/org/apache/fluss/fs/abfs/token/AzureDelegationTokenProviderTest.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.abfs.token;
+
+import org.apache.fluss.fs.token.Credentials;
+import org.apache.fluss.fs.token.CredentialsJsonSerde;
+import org.apache.fluss.fs.token.ObtainedSecurityToken;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AzureDelegationTokenProvider}. */
+public class AzureDelegationTokenProviderTest {
+
+    private static final String CONFIG_PREFIX = 
"fs.azure.account.oauth2.client";
+    private static final String CLIENT_ID = "testClientId";
+    private static final String CLIENT_SECRET = "testClientSecret";
+
+    private static final String ENDPOINT_KEY = "http://localhost:8080";;
+
+    private static MockAuthServer mockGSServer;

Review Comment:
   Should this say abfs instead?



##########
fluss-filesystems/fluss-fs-abfs/src/test/java/org/apache/fluss/fs/abfs/token/DynamicTemporaryAzureCredentialsProviderTest.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.abfs.token;
+
+import org.apache.fluss.fs.token.Credentials;
+import org.apache.fluss.fs.token.CredentialsJsonSerde;
+import org.apache.fluss.fs.token.ObtainedSecurityToken;
+
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException;
+import org.apache.hadoop.fs.azurebfs.oauth2.AzureADToken;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link DynamicTemporaryAzureCredentialsProvider}. */
+class DynamicTemporaryAzureCredentialsProviderTest {
+
+    private static final String CLIENT_ID = "testClientId";
+    private static final String CLIENT_SECRET = "testClientSecret";

Review Comment:
   Let's use null values for these to be consistent with the behaviour of 
Provider.



##########
fluss-filesystems/fluss-fs-abfs/src/test/java/org/apache/fluss/fs/abfs/token/DynamicTemporaryAzureCredentialsProviderTest.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.abfs.token;
+
+import org.apache.fluss.fs.token.Credentials;
+import org.apache.fluss.fs.token.CredentialsJsonSerde;
+import org.apache.fluss.fs.token.ObtainedSecurityToken;
+
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException;
+import org.apache.hadoop.fs.azurebfs.oauth2.AzureADToken;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link DynamicTemporaryAzureCredentialsProvider}. */
+class DynamicTemporaryAzureCredentialsProviderTest {
+
+    private static final String CLIENT_ID = "testClientId";
+    private static final String CLIENT_SECRET = "testClientSecret";
+
+    private static final String SESSION_TOKEN = "sessionToken";
+
+    @Test
+    void getCredentialsShouldThrowExceptionWhenNoCredentials() {
+        DynamicTemporaryAzureCredentialsProvider provider =
+                new DynamicTemporaryAzureCredentialsProvider();
+
+        
assertThatThrownBy(provider::getToken).isInstanceOf(TokenAccessProviderException.class);
+    }
+
+    @Test
+    void getCredentialsShouldStoreCredentialsWhenCredentialsProvided() throws 
Exception {
+        DynamicTemporaryAzureCredentialsProvider provider =
+                new DynamicTemporaryAzureCredentialsProvider();
+        Credentials credentials = new Credentials(CLIENT_ID, CLIENT_SECRET, 
SESSION_TOKEN);
+
+        AzureDelegationTokenReceiver receiver = new 
AzureDelegationTokenReceiver();
+
+        byte[] json = CredentialsJsonSerde.toJson(credentials);
+
+        ObtainedSecurityToken obtainedSecurityToken =
+                new ObtainedSecurityToken("abfs", json, 1L, new HashMap<>());
+        receiver.onNewTokensObtained(obtainedSecurityToken);
+
+        AzureADToken azureADToken = provider.getToken();
+        
assertThat(azureADToken.getAccessToken()).isEqualTo(credentials.getSecurityToken());

Review Comment:
   Let's also assert on `accessKeyId` and `secretAccessKey`



##########
fluss-filesystems/fluss-fs-abfs/src/test/java/org/apache/fluss/fs/abfs/token/AuthServerHandler.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.abfs.token;
+
+import org.apache.fluss.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.fluss.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.fluss.shaded.netty4.io.netty.util.AsciiString;
+import org.apache.fluss.utils.IOUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import static 
org.apache.fluss.shaded.guava32.com.google.common.net.HttpHeaders.CONTENT_ENCODING;
+import static 
org.apache.fluss.shaded.guava32.com.google.common.net.HttpHeaders.CONTENT_LENGTH;
+import static 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
+import static 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON;
+import static 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues.CLOSE;
+import static 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
+
+/** Netty Handler for facilitating the Google auth token generation. */
+public class AuthServerHandler extends SimpleChannelInboundHandler<HttpObject> 
{
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) {
+        ctx.flush();
+    }
+
+    @Override
+    public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
+        if (msg instanceof HttpRequest) {
+            HttpRequest req = (HttpRequest) msg;
+
+            try {
+                URI url = URI.create(req.uri());
+                if (req.method().equals(HttpMethod.POST)) {
+                    postRequest(ctx, url, req);
+                } else {
+                    getRequest(ctx, url, req);
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private void postRequest(ChannelHandlerContext ctx, URI url, HttpRequest 
req)
+            throws IOException {
+        if (url.getPath().endsWith("/")) {
+            jsonResponse(ctx, req, "create-token.json");
+        } else {
+            response(ctx, req, new byte[] {}, NOT_FOUND, APPLICATION_JSON);
+        }
+    }
+
+    private void getRequest(ChannelHandlerContext ctx, URI url, HttpRequest 
req)
+            throws IOException {
+        if (url.getPath().endsWith("/token")) {
+            jsonResponse(ctx, req, "create-token.json");
+        } else {
+            response(ctx, req, new byte[] {}, NOT_FOUND, APPLICATION_JSON);
+        }
+    }
+
+    private void jsonResponse(ChannelHandlerContext ctx, HttpRequest req, 
String path)
+            throws IOException {
+        jsonResponse(ctx, req, path, OK);
+    }
+
+    private void jsonResponse(
+            ChannelHandlerContext ctx,
+            HttpRequest req,
+            String path,
+            HttpResponseStatus responseStatus)
+            throws IOException {
+        response(ctx, req, readFromResources(path), responseStatus, 
APPLICATION_JSON);
+    }
+
+    private static void response(
+            ChannelHandlerContext ctx,
+            HttpRequest req,
+            byte[] bytes,
+            HttpResponseStatus status,
+            AsciiString contentType) {
+        FullHttpResponse response =
+                new DefaultFullHttpResponse(
+                        req.protocolVersion(), status, 
Unpooled.wrappedBuffer(bytes));
+        response.headers()
+                .set(CONTENT_TYPE, contentType)
+                .set("Location", "http://localhost:8080/resumbable-upload";)

Review Comment:
   Question for my understanding: Why do we need to have this endpoint in 
test's AuthServerHandler?



##########
fluss-filesystems/fluss-fs-abfs/src/test/java/org/apache/fluss/fs/abfs/token/AuthServerHandler.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.abfs.token;
+
+import org.apache.fluss.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.fluss.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.fluss.shaded.netty4.io.netty.util.AsciiString;
+import org.apache.fluss.utils.IOUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import static 
org.apache.fluss.shaded.guava32.com.google.common.net.HttpHeaders.CONTENT_ENCODING;
+import static 
org.apache.fluss.shaded.guava32.com.google.common.net.HttpHeaders.CONTENT_LENGTH;
+import static 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
+import static 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON;
+import static 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues.CLOSE;
+import static 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static 
org.apache.fluss.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
+
+/** Netty Handler for facilitating the Google auth token generation. */

Review Comment:
   Should this say Azure instead of Google?



##########
fluss-filesystems/fluss-fs-abfs/src/test/java/org/apache/fluss/fs/abfs/AbfsFileSystemBehaviorITCase.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.abfs;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FileSystemBehaviorTestSuite;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.fs.abfs.token.MockAuthServer;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.IOException;
+import java.net.URI;
+
+/** Tests that validate the behavior of the Google Cloud Storage File System 
Plugin. */
+class AbfsFileSystemBehaviorITCase extends FileSystemBehaviorTestSuite {
+
+    private static final String CONFIG_PREFIX = "fs.azure.account";
+    private static final String CLIENT_ID = "testClientId";
+    private static final String CLIENT_SECRET = "testClientSecret";
+
+    private static final String AZURE_ACCOUNT_KEY = "ZmFrZS1rZXkK";
+    private static final String ENDPOINT_KEY = "http://localhost:8080";;
+    public static final String ABFS_FS_PATH = 
"abfs://[email protected]/test";
+
+    private static MockAuthServer mockGSServer;

Review Comment:
   Should this be named `mockAbfsServer` instead?



##########
fluss-filesystems/fluss-fs-abfs/src/test/java/org/apache/fluss/fs/abfs/token/AzureDelegationTokenProviderTest.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.abfs.token;
+
+import org.apache.fluss.fs.token.Credentials;
+import org.apache.fluss.fs.token.CredentialsJsonSerde;
+import org.apache.fluss.fs.token.ObtainedSecurityToken;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AzureDelegationTokenProvider}. */
+public class AzureDelegationTokenProviderTest {
+
+    private static final String CONFIG_PREFIX = 
"fs.azure.account.oauth2.client";
+    private static final String CLIENT_ID = "testClientId";
+    private static final String CLIENT_SECRET = "testClientSecret";
+
+    private static final String ENDPOINT_KEY = "http://localhost:8080";;
+
+    private static MockAuthServer mockGSServer;
+
+    @BeforeAll
+    static void setup() {
+        mockGSServer = MockAuthServer.create();
+    }
+
+    @Test
+    void obtainSecurityTokenShouldReturnSecurityToken() {
+        Configuration configuration = new Configuration();
+        configuration.set(CONFIG_PREFIX + ".id", CLIENT_ID);
+        configuration.set(CONFIG_PREFIX + ".secret", CLIENT_SECRET);
+        configuration.set(CONFIG_PREFIX + ".endpoint", ENDPOINT_KEY);
+        AzureDelegationTokenProvider azureDelegationTokenProvider =
+                new AzureDelegationTokenProvider("abfs", configuration);
+        ObtainedSecurityToken obtainedSecurityToken =
+                azureDelegationTokenProvider.obtainSecurityToken();
+        byte[] token = obtainedSecurityToken.getToken();
+        Credentials credentials = CredentialsJsonSerde.fromJson(token);
+        assertThat(credentials.getSecurityToken()).isEqualTo("token");

Review Comment:
   We should also assert on `accessKeyId` and `secretAccessKey`, even if they 
are expected to be null.



##########
fluss-filesystems/fluss-fs-abfs/src/test/java/org/apache/fluss/fs/abfs/token/AzureDelegationTokenReceiverTest.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.abfs.token;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+import static 
org.apache.fluss.fs.abfs.token.AzureDelegationTokenReceiver.PROVIDER_CONFIG_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link AzureDelegationTokenReceiver}. */
+class AzureDelegationTokenReceiverTest {
+
+    private static final String PROVIDER_CLASS_NAME = "TestProvider";
+    private static final String REGION = "testRegion";

Review Comment:
   Let's remove REGION if it is unused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to