This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 4312b632b [#3760] improvement(client-python): Add Docker env and 
PyGVFS Integration tests (#3876)
4312b632b is described below

commit 4312b632b225b5a5a2ea6e0f0bdcbdd3092a1db8
Author: xloya <[email protected]>
AuthorDate: Fri Jul 5 11:26:27 2024 +0800

    [#3760] improvement(client-python): Add Docker env and PyGVFS Integration 
tests (#3876)
    
    ### What changes were proposed in this pull request?
    
    Add Hive Docker env for client-python, and add integration tests for
    PyGVFS + HDFS. Depends on #3528.
    
    ### Why are the changes needed?
    
    Fix: #3760
    
    ### How was this patch tested?
    
    Add some ITs.
    
    ---------
    
    Co-authored-by: xiaojiebao <[email protected]>
---
 .github/workflows/python-integration-test.yml      |   2 +-
 clients/client-python/build.gradle.kts             |  55 +-
 clients/client-python/requirements-dev.txt         |   3 +-
 .../tests/integration/base_hadoop_env.py           | 101 +++
 .../tests/integration/hdfs_container.py            | 158 +++++
 .../tests/integration/integration_test_env.py      |  86 +++
 .../tests/integration/test_gvfs_with_hdfs.py       | 704 +++++++++++++++++++++
 .../tests/integration/test_simple_auth_client.py   |   4 +-
 8 files changed, 1098 insertions(+), 15 deletions(-)

diff --git a/.github/workflows/python-integration-test.yml 
b/.github/workflows/python-integration-test.yml
index f2e5fd4ed..a7ffacfd7 100644
--- a/.github/workflows/python-integration-test.yml
+++ b/.github/workflows/python-integration-test.yml
@@ -66,7 +66,7 @@ jobs:
           for pythonVersion in "3.8" "3.9" "3.10" "3.11"
           do
             echo "Use Python version ${pythonVersion} to test the Python 
client."
-            ./gradlew -PjdkVersion=${{ matrix.java-version }} 
-PpythonVersion=${pythonVersion} :clients:client-python:test
+            ./gradlew -PjdkVersion=${{ matrix.java-version }} 
-PpythonVersion=${pythonVersion} -PskipDockerTests=false 
:clients:client-python:test
             # Clean Gravitino database to clean test data
             rm -rf ./distribution/package/data
           done
diff --git a/clients/client-python/build.gradle.kts 
b/clients/client-python/build.gradle.kts
index 68cc897e5..2cf83c376 100644
--- a/clients/client-python/build.gradle.kts
+++ b/clients/client-python/build.gradle.kts
@@ -16,12 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+import de.undercouch.gradle.tasks.download.Download
+import de.undercouch.gradle.tasks.download.Verify
 import io.github.piyushroshan.python.VenvTask
 import java.net.HttpURLConnection
 import java.net.URL
 
 plugins {
   id("io.github.piyushroshan.python-gradle-miniforge-plugin") version "1.0.0"
+  id("de.undercouch.download") version "5.6.0"
 }
 
 pythonPlugin {
@@ -148,6 +151,10 @@ fun generatePypiProjectHomePage() {
   }
 }
 
+val hadoopVersion = "2.7.3"
+val hadoopPackName = "hadoop-${hadoopVersion}.tar.gz"
+val hadoopDirName = "hadoop-${hadoopVersion}"
+val hadoopDownloadUrl = 
"https://archive.apache.org/dist/hadoop/core/hadoop-${hadoopVersion}/${hadoopPackName}";
 tasks {
   val pipInstall by registering(VenvTask::class) {
     venvExec = "pip"
@@ -173,6 +180,26 @@ tasks {
     workingDir = projectDir.resolve("./tests/integration")
   }
 
+  val build by registering(VenvTask::class) {
+    dependsOn(pylint)
+    venvExec = "python"
+    args = listOf("scripts/generate_version.py")
+  }
+
+  val downloadHadoopPack by registering(Download::class) {
+    dependsOn(build)
+    onlyIfModified(true)
+    src(hadoopDownloadUrl)
+    dest(layout.buildDirectory.dir("tmp"))
+  }
+
+  val verifyHadoopPack by registering(Verify::class) {
+    dependsOn(downloadHadoopPack)
+    src(layout.buildDirectory.file("tmp/${hadoopPackName}"))
+    algorithm("MD5")
+    checksum("3455bb57e4b4906bbea67b58cca78fa8")
+  }
+
   val integrationTest by registering(VenvTask::class) {
     doFirst {
       gravitinoServer("start")
@@ -181,11 +208,23 @@ tasks {
     venvExec = "coverage"
     args = listOf("run", "--branch", "-m", "unittest")
     workingDir = projectDir.resolve("./tests/integration")
-    environment = mapOf(
-      "PROJECT_VERSION" to project.version,
-      "GRAVITINO_HOME" to project.rootDir.path + "/distribution/package",
-      "START_EXTERNAL_GRAVITINO" to "true"
-    )
+    val dockerTest = project.rootProject.extra["dockerTest"] as? Boolean ?: 
false
+    val envMap = mapOf<String, Any>().toMutableMap()
+    if (dockerTest) {
+      dependsOn("verifyHadoopPack")
+      envMap.putAll(mapOf(
+          "HADOOP_VERSION" to hadoopVersion,
+          "PYTHON_BUILD_PATH" to project.rootDir.path + 
"/clients/client-python/build"
+      ))
+    }
+    envMap.putAll(mapOf(
+        "PROJECT_VERSION" to project.version,
+        "GRAVITINO_HOME" to project.rootDir.path + "/distribution/package",
+        "START_EXTERNAL_GRAVITINO" to "true",
+        "DOCKER_TEST" to dockerTest.toString(),
+        "GRAVITINO_CI_HIVE_DOCKER_IMAGE" to 
"datastrato/gravitino-ci-hive:0.1.12",
+    ))
+    environment = envMap
 
     doLast {
       gravitinoServer("stop")
@@ -224,12 +263,6 @@ tasks {
     }
   }
 
-  val build by registering(VenvTask::class) {
-    dependsOn(pylint)
-    venvExec = "python"
-    args = listOf("scripts/generate_version.py")
-  }
-
   val pydoc by registering(VenvTask::class) {
     venvExec = "python"
     args = listOf("scripts/generate_doc.py")
diff --git a/clients/client-python/requirements-dev.txt 
b/clients/client-python/requirements-dev.txt
index 77387c01c..06f634358 100644
--- a/clients/client-python/requirements-dev.txt
+++ b/clients/client-python/requirements-dev.txt
@@ -26,4 +26,5 @@ pyarrow==15.0.2
 llama-index==0.10.40
 tenacity==8.3.0
 cachetools==5.3.3
-readerwriterlock==1.0.9
\ No newline at end of file
+readerwriterlock==1.0.9
+docker==7.1.0
\ No newline at end of file
diff --git a/clients/client-python/tests/integration/base_hadoop_env.py 
b/clients/client-python/tests/integration/base_hadoop_env.py
new file mode 100644
index 000000000..694331078
--- /dev/null
+++ b/clients/client-python/tests/integration/base_hadoop_env.py
@@ -0,0 +1,101 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+
+import logging
+import os
+import shutil
+import subprocess
+import tarfile
+
+from gravitino.exceptions.gravitino_runtime_exception import 
GravitinoRuntimeException
+
+logger = logging.getLogger(__name__)
+
+HADOOP_VERSION = os.environ.get("HADOOP_VERSION")
+PYTHON_BUILD_PATH = os.environ.get("PYTHON_BUILD_PATH")
+
+
+class BaseHadoopEnvironment:
+
+    @classmethod
+    def init_hadoop_env(cls):
+        cls._unzip_hadoop_pack()
+        # configure hadoop env
+        cls._configure_hadoop_environment()
+
+    @classmethod
+    def clear_hadoop_env(cls):
+        try:
+            shutil.rmtree(f"{PYTHON_BUILD_PATH}/hadoop")
+        except Exception as e:
+            raise GravitinoRuntimeException(
+                f"Failed to delete dir '{PYTHON_BUILD_PATH}/hadoop': {e}"
+            ) from e
+
+    @classmethod
+    def _unzip_hadoop_pack(cls):
+        hadoop_pack = f"{PYTHON_BUILD_PATH}/tmp/hadoop-{HADOOP_VERSION}.tar.gz"
+        unzip_dir = f"{PYTHON_BUILD_PATH}/hadoop"
+        logger.info("Unzip hadoop pack from %s.", hadoop_pack)
+        # unzip the pack
+        if os.path.exists(unzip_dir):
+            try:
+                shutil.rmtree(unzip_dir)
+            except Exception as e:
+                raise GravitinoRuntimeException(
+                    f"Failed to delete dir '{unzip_dir}': {e}"
+                ) from e
+        try:
+            with tarfile.open(hadoop_pack) as tar:
+                tar.extractall(path=unzip_dir)
+        except Exception as e:
+            raise GravitinoRuntimeException(
+                f"Failed to extract file '{hadoop_pack}': {e}"
+            ) from e
+
+    @classmethod
+    def _configure_hadoop_environment(cls):
+        logger.info("Configure hadoop environment.")
+        os.putenv("HADOOP_USER_NAME", "datastrato")
+        os.putenv("HADOOP_HOME", 
f"{PYTHON_BUILD_PATH}/hadoop/hadoop-{HADOOP_VERSION}")
+        os.putenv(
+            "HADOOP_CONF_DIR",
+            f"{PYTHON_BUILD_PATH}/hadoop/hadoop-{HADOOP_VERSION}/etc/hadoop",
+        )
+        hadoop_shell_path = (
+            f"{PYTHON_BUILD_PATH}/hadoop/hadoop-{HADOOP_VERSION}/bin/hadoop"
+        )
+        # get the classpath
+        try:
+            result = subprocess.run(
+                [hadoop_shell_path, "classpath", "--glob"],
+                capture_output=True,
+                text=True,
+                check=True,
+            )
+            if result.returncode == 0:
+                os.putenv("CLASSPATH", str(result.stdout))
+            else:
+                raise GravitinoRuntimeException(
+                    f"Command failed with return code is not 0, stdout: 
{result.stdout}, stderr:{result.stderr}"
+                )
+        except subprocess.CalledProcessError as e:
+            raise GravitinoRuntimeException(
+                f"Command failed with return code {e.returncode}, 
stderr:{e.stderr}"
+            ) from e
diff --git a/clients/client-python/tests/integration/hdfs_container.py 
b/clients/client-python/tests/integration/hdfs_container.py
new file mode 100644
index 000000000..34bc41d86
--- /dev/null
+++ b/clients/client-python/tests/integration/hdfs_container.py
@@ -0,0 +1,158 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+
+import asyncio
+import logging
+import os
+import time
+
+import docker
+from docker import types as tp
+from docker.errors import NotFound
+
+from gravitino.exceptions.gravitino_runtime_exception import 
GravitinoRuntimeException
+
+logger = logging.getLogger(__name__)
+
+
+async def check_hdfs_status(hdfs_container):
+    retry_limit = 15
+    for _ in range(retry_limit):
+        try:
+            command_and_args = ["bash", "/tmp/check-status.sh"]
+            exec_result = hdfs_container.exec_run(command_and_args)
+            if exec_result.exit_code != 0:
+                message = (
+                    f"Command {command_and_args} exited with 
{exec_result.exit_code}"
+                )
+                logger.warning(message)
+                logger.warning("output: %s", exec_result.output)
+                output_status_command = ["hdfs", "dfsadmin", "-report"]
+                exec_result = hdfs_container.exec_run(output_status_command)
+                logger.info("HDFS report, output: %s", exec_result.output)
+            else:
+                logger.info("HDFS startup successfully!")
+                return True
+        except Exception as e:
+            logger.error(
+                "Exception occurred while checking HDFS container status: %s", 
e
+            )
+        time.sleep(10)
+    return False
+
+
+async def check_hdfs_container_status(hdfs_container):
+    timeout_sec = 150
+    try:
+        result = await asyncio.wait_for(
+            check_hdfs_status(hdfs_container), timeout=timeout_sec
+        )
+        assert result is True, "HDFS container startup failed!"
+    except asyncio.TimeoutError as e:
+        raise GravitinoRuntimeException(
+            "Timeout occurred while waiting for checking HDFS container 
status."
+        ) from e
+
+
+class HDFSContainer:
+    _docker_client = None
+    _container = None
+    _network = None
+    _ip = ""
+    _network_name = "python-net"
+    _container_name = "python-hdfs"
+
+    def __init__(self):
+        self._docker_client = docker.from_env()
+        self._create_networks()
+        try:
+            container = 
self._docker_client.containers.get(self._container_name)
+            if container is not None:
+                if container.status != "running":
+                    container.restart()
+                self._container = container
+        except NotFound:
+            logger.warning("Cannot find hdfs container in docker env, skip 
remove.")
+        if self._container is None:
+            image_name = os.environ.get("GRAVITINO_CI_HIVE_DOCKER_IMAGE")
+            if image_name is None:
+                raise GravitinoRuntimeException(
+                    "GRAVITINO_CI_HIVE_DOCKER_IMAGE env variable is not set."
+                )
+            self._container = self._docker_client.containers.run(
+                image=image_name,
+                name=self._container_name,
+                detach=True,
+                environment={"HADOOP_USER_NAME": "datastrato"},
+                network=self._network_name,
+            )
+        asyncio.run(check_hdfs_container_status(self._container))
+
+        self._fetch_ip()
+
+    def _create_networks(self):
+        pool_config = tp.IPAMPool(subnet="10.20.31.16/28")
+        ipam_config = tp.IPAMConfig(driver="default", 
pool_configs=[pool_config])
+        networks = self._docker_client.networks.list()
+        for network in networks:
+            if network.name == self._network_name:
+                self._network = network
+                break
+        if self._network is None:
+            self._network = self._docker_client.networks.create(
+                name=self._network_name, driver="bridge", ipam=ipam_config
+            )
+
+    def _fetch_ip(self):
+        if self._container is None:
+            raise GravitinoRuntimeException("The HDFS container has not init.")
+
+        container_info = 
self._docker_client.api.inspect_container(self._container.id)
+        self._ip = 
container_info["NetworkSettings"]["Networks"][self._network_name][
+            "IPAddress"
+        ]
+
+    def get_ip(self):
+        return self._ip
+
+    def close(self):
+        try:
+            self._container.kill()
+        except RuntimeError as e:
+            logger.warning(
+                "Exception occurred while killing container %s : %s",
+                self._container_name,
+                e,
+            )
+        try:
+            self._container.remove()
+        except RuntimeError as e:
+            logger.warning(
+                "Exception occurred while removing container %s : %s",
+                self._container_name,
+                e,
+            )
+        try:
+            self._network.remove()
+        except RuntimeError as e:
+            logger.warning(
+                "Exception occurred while removing network %s : %s",
+                self._network_name,
+                e,
+            )
diff --git a/clients/client-python/tests/integration/integration_test_env.py 
b/clients/client-python/tests/integration/integration_test_env.py
index a34bba23d..dde2c2410 100644
--- a/clients/client-python/tests/integration/integration_test_env.py
+++ b/clients/client-python/tests/integration/integration_test_env.py
@@ -26,6 +26,8 @@ import sys
 
 import requests
 
+from gravitino.exceptions.gravitino_runtime_exception import 
GravitinoRuntimeException
+
 logger = logging.getLogger(__name__)
 
 
@@ -132,3 +134,87 @@ class IntegrationTestEnv(unittest.TestCase):
 
         if gravitino_server_running:
             logger.error("Can't stop Gravitino server!")
+
+    @classmethod
+    def restart_server(cls):
+        logger.info("Restarting Gravitino server...")
+        gravitino_home = os.environ.get("GRAVITINO_HOME")
+        gravitino_startup_script = os.path.join(gravitino_home, 
"bin/gravitino.sh")
+        if not os.path.exists(gravitino_startup_script):
+            raise GravitinoRuntimeException(
+                f"Can't find Gravitino startup script: 
{gravitino_startup_script}, "
+                "Please execute `./gradlew compileDistribution -x test` in the 
Gravitino "
+                "project root directory."
+            )
+
+        # Restart Gravitino Server
+        env_vars = os.environ.copy()
+        env_vars["HADOOP_USER_NAME"] = "datastrato"
+        result = subprocess.run(
+            [gravitino_startup_script, "restart"],
+            env=env_vars,
+            capture_output=True,
+            text=True,
+            check=False,
+        )
+        if result.stdout:
+            logger.info("stdout: %s", result.stdout)
+        if result.stderr:
+            logger.info("stderr: %s", result.stderr)
+
+        if not check_gravitino_server_status():
+            raise GravitinoRuntimeException("ERROR: Can't start Gravitino 
server!")
+
+    @classmethod
+    def _append_catalog_hadoop_conf(cls, config):
+        logger.info("Append catalog hadoop conf.")
+        gravitino_home = os.environ.get("GRAVITINO_HOME")
+        if gravitino_home is None:
+            raise GravitinoRuntimeException("Cannot find GRAVITINO_HOME env.")
+        hadoop_conf_path = f"{gravitino_home}/catalogs/hadoop/conf/hadoop.conf"
+        if not os.path.exists(hadoop_conf_path):
+            raise GravitinoRuntimeException(
+                f"Hadoop conf file is not found at `{hadoop_conf_path}`."
+            )
+
+        with open(hadoop_conf_path, mode="a", encoding="utf-8") as f:
+            for key, value in config.items():
+                f.write(f"\n{key} = {value}")
+
+    @classmethod
+    def _reset_catalog_hadoop_conf(cls, config):
+        logger.info("Reset catalog hadoop conf.")
+        gravitino_home = os.environ.get("GRAVITINO_HOME")
+        if gravitino_home is None:
+            raise GravitinoRuntimeException("Cannot find GRAVITINO_HOME env.")
+        hadoop_conf_path = f"{gravitino_home}/catalogs/hadoop/conf/hadoop.conf"
+        if not os.path.exists(hadoop_conf_path):
+            raise GravitinoRuntimeException(
+                f"Hadoop conf file is not found at `{hadoop_conf_path}`."
+            )
+        filtered_lines = []
+        with open(hadoop_conf_path, mode="r", encoding="utf-8") as file:
+            origin_lines = file.readlines()
+
+        existed_config = {}
+        for line in origin_lines:
+            line = line.strip()
+            if line.startswith("#"):
+                # append annotations directly
+                filtered_lines.append(line + "\n")
+            else:
+                try:
+                    key, value = line.split("=")
+                    existed_config[key.strip()] = value.strip()
+                except ValueError:
+                    # cannot split to key, value, so just append
+                    filtered_lines.append(line + "\n")
+
+        for key, value in existed_config.items():
+            if config[key] is None:
+                append_line = f"{key} = {value}\n"
+                filtered_lines.append(append_line)
+
+        with open(hadoop_conf_path, mode="w", encoding="utf-8") as file:
+            for line in filtered_lines:
+                file.write(line)
diff --git a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py 
b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
new file mode 100644
index 000000000..442bbc87e
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
@@ -0,0 +1,704 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+
+import logging
+import os
+import platform
+import unittest
+from random import randint
+from typing import Dict
+
+import pandas
+import pyarrow as pa
+import pyarrow.dataset as dt
+import pyarrow.parquet as pq
+from fsspec.implementations.local import LocalFileSystem
+from fsspec.implementations.arrow import ArrowFSWrapper
+from llama_index.core import SimpleDirectoryReader
+from pyarrow.fs import HadoopFileSystem
+from gravitino import (
+    gvfs,
+    NameIdentifier,
+    GravitinoAdminClient,
+    GravitinoClient,
+    Catalog,
+    Fileset,
+)
+from gravitino.exceptions.gravitino_runtime_exception import 
GravitinoRuntimeException
+from tests.integration.integration_test_env import IntegrationTestEnv
+from tests.integration.hdfs_container import HDFSContainer
+from tests.integration.base_hadoop_env import BaseHadoopEnvironment
+
+logger = logging.getLogger(__name__)
+
+DOCKER_TEST = os.environ.get("DOCKER_TEST")
+
+
+#  The Hadoop distribution package does not have native hdfs libraries for 
macOS / Windows systems
+#  (`libhdfs.dylib` for macOS and `libhdfs.dll` for Windows), so the 
integration tests cannot be run
+#  on these two systems at present.
[email protected](
+    DOCKER_TEST == "false" or platform.system() != "Linux",
+    "Skipping tests on non-Linux systems or when DOCKER_TEST=false",
+)
+class TestGvfsWithHDFS(IntegrationTestEnv):
+    hdfs_container: HDFSContainer = None
+    config: Dict = None
+    metalake_name: str = "TestGvfsWithHDFS_metalake" + str(randint(1, 10000))
+    catalog_name: str = "test_gvfs_catalog" + str(randint(1, 10000))
+    catalog_provider: str = "hadoop"
+
+    schema_name: str = "test_gvfs_schema"
+
+    fileset_name: str = "test_gvfs_fileset"
+    fileset_comment: str = "fileset_comment"
+    fileset_storage_location: str = ""
+    fileset_properties_key1: str = "fileset_properties_key1"
+    fileset_properties_value1: str = "fileset_properties_value1"
+    fileset_properties_key2: str = "fileset_properties_key2"
+    fileset_properties_value2: str = "fileset_properties_value2"
+    fileset_properties: Dict[str, str] = {
+        fileset_properties_key1: fileset_properties_value1,
+        fileset_properties_key2: fileset_properties_value2,
+    }
+
+    schema_ident: NameIdentifier = NameIdentifier.of(
+        metalake_name, catalog_name, schema_name
+    )
+    fileset_ident: NameIdentifier = NameIdentifier.of(schema_name, 
fileset_name)
+
+    gravitino_admin_client: GravitinoAdminClient = GravitinoAdminClient(
+        uri="http://localhost:8090";
+    )
+    gravitino_client: GravitinoClient = None
+
+    @classmethod
+    def setUpClass(cls):
+        cls.hdfs_container = HDFSContainer()
+        hdfs_container_ip = cls.hdfs_container.get_ip()
+        # init hadoop env
+        BaseHadoopEnvironment.init_hadoop_env()
+        cls.config = {
+            "gravitino.bypass.fs.defaultFS": f"hdfs://{hdfs_container_ip}:9000"
+        }
+        # append the hadoop conf to server
+        cls._append_catalog_hadoop_conf(cls.config)
+        # restart the server
+        cls.restart_server()
+        # create entity
+        cls._init_test_entities()
+
+    @classmethod
+    def tearDownClass(cls):
+        try:
+            cls._clean_test_data()
+            # reset server conf
+            cls._reset_catalog_hadoop_conf(cls.config)
+            # restart server
+            cls.restart_server()
+            # clear hadoop env
+            BaseHadoopEnvironment.clear_hadoop_env()
+        finally:
+            # close hdfs container
+            cls.hdfs_container.close()
+
+    @classmethod
+    def _init_test_entities(cls):
+        cls.gravitino_admin_client.create_metalake(
+            name=cls.metalake_name, comment="", properties={}
+        )
+        cls.gravitino_client = GravitinoClient(
+            uri="http://localhost:8090";, metalake_name=cls.metalake_name
+        )
+        catalog = cls.gravitino_client.create_catalog(
+            name=cls.catalog_name,
+            catalog_type=Catalog.Type.FILESET,
+            provider=cls.catalog_provider,
+            comment="",
+            properties={},
+        )
+        catalog.as_schemas().create_schema(
+            schema_name=cls.schema_name, comment="", properties={}
+        )
+
+        cls.fileset_storage_location: str = (
+            
f"hdfs://{cls.hdfs_container.get_ip()}:9000/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+        )
+        cls.fileset_gvfs_location = (
+            
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+        )
+        catalog.as_fileset_catalog().create_fileset(
+            ident=cls.fileset_ident,
+            fileset_type=Fileset.Type.MANAGED,
+            comment=cls.fileset_comment,
+            storage_location=cls.fileset_storage_location,
+            properties=cls.fileset_properties,
+        )
+        arrow_hadoop_fs = HadoopFileSystem(host=cls.hdfs_container.get_ip(), 
port=9000)
+        cls.hdfs = ArrowFSWrapper(arrow_hadoop_fs)
+        cls.conf: Dict = {"fs.defaultFS": 
f"hdfs://{cls.hdfs_container.get_ip()}:9000/"}
+
+    @classmethod
+    def _clean_test_data(cls):
+        try:
+            cls.gravitino_client = GravitinoClient(
+                uri="http://localhost:8090";, metalake_name=cls.metalake_name
+            )
+            catalog = cls.gravitino_client.load_catalog(name=cls.catalog_name)
+            logger.info(
+                "Drop fileset %s[%s]",
+                cls.fileset_ident,
+                
catalog.as_fileset_catalog().drop_fileset(ident=cls.fileset_ident),
+            )
+            logger.info(
+                "Drop schema %s[%s]",
+                cls.schema_ident,
+                catalog.as_schemas().drop_schema(
+                    schema_name=cls.schema_name, cascade=True
+                ),
+            )
+            logger.info(
+                "Drop catalog %s[%s]",
+                cls.catalog_name,
+                cls.gravitino_client.drop_catalog(name=cls.catalog_name),
+            )
+            logger.info(
+                "Drop metalake %s[%s]",
+                cls.metalake_name,
+                cls.gravitino_admin_client.drop_metalake(cls.metalake_name),
+            )
+        except Exception as e:
+            logger.error("Clean test data failed: %s", e)
+
+    def test_ls(self):
+        ls_dir = self.fileset_gvfs_location + "/test_ls"
+        ls_actual_dir = self.fileset_storage_location + "/test_ls"
+
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(ls_actual_dir)
+        self.assertTrue(self.hdfs.exists(ls_actual_dir))
+
+        ls_file = self.fileset_gvfs_location + "/test_ls/test.file"
+        ls_actual_file = self.fileset_storage_location + "/test_ls/test.file"
+        self.hdfs.touch(ls_actual_file)
+        self.assertTrue(self.hdfs.exists(ls_actual_file))
+
+        # test detail = false
+        file_list_without_detail = fs.ls(ls_dir, detail=False)
+        self.assertEqual(1, len(file_list_without_detail))
+        self.assertEqual(file_list_without_detail[0], ls_file[len("gvfs://") 
:])
+
+        # test detail = true
+        file_list_with_detail = fs.ls(ls_dir, detail=True)
+        self.assertEqual(1, len(file_list_with_detail))
+        self.assertEqual(file_list_with_detail[0]["name"], 
ls_file[len("gvfs://") :])
+
+    def test_info(self):
+        info_dir = self.fileset_gvfs_location + "/test_info"
+        info_actual_dir = self.fileset_storage_location + "/test_info"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(info_actual_dir)
+        self.assertTrue(self.hdfs.exists(info_actual_dir))
+
+        info_file = self.fileset_gvfs_location + "/test_info/test.file"
+        info_actual_file = self.fileset_storage_location + 
"/test_info/test.file"
+        self.hdfs.touch(info_actual_file)
+        self.assertTrue(self.hdfs.exists(info_actual_file))
+
+        dir_info = fs.info(info_dir)
+        self.assertEqual(dir_info["name"], info_dir[len("gvfs://") :])
+
+        file_info = fs.info(info_file)
+        self.assertEqual(file_info["name"], info_file[len("gvfs://") :])
+
+    def test_exist(self):
+        exist_dir = self.fileset_gvfs_location + "/test_exist"
+        exist_actual_dir = self.fileset_storage_location + "/test_exist"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(exist_actual_dir)
+        self.assertTrue(self.hdfs.exists(exist_actual_dir))
+        self.assertTrue(fs.exists(exist_dir))
+
+        exist_file = self.fileset_gvfs_location + "/test_exist/test.file"
+        exist_actual_file = self.fileset_storage_location + 
"/test_exist/test.file"
+        self.hdfs.touch(exist_actual_file)
+        self.assertTrue(self.hdfs.exists(exist_actual_file))
+        self.assertTrue(fs.exists(exist_file))
+
+    def test_cp_file(self):
+        cp_file_dir = self.fileset_gvfs_location + "/test_cp_file"
+        cp_file_actual_dir = self.fileset_storage_location + "/test_cp_file"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(cp_file_actual_dir)
+        self.assertTrue(self.hdfs.exists(cp_file_actual_dir))
+        self.assertTrue(fs.exists(cp_file_dir))
+
+        cp_file_file = self.fileset_gvfs_location + "/test_cp_file/test.file"
+        cp_file_actual_file = self.fileset_storage_location + 
"/test_cp_file/test.file"
+        self.hdfs.touch(cp_file_actual_file)
+        self.assertTrue(self.hdfs.exists(cp_file_actual_file))
+        self.assertTrue(fs.exists(cp_file_file))
+
+        with self.hdfs.open(cp_file_actual_file, "wb") as f:
+            f.write(b"test_file_1")
+
+        cp_file_new_file = self.fileset_gvfs_location + 
"/test_cp_file/test_cp.file"
+        cp_file_new_actual_file = (
+            self.fileset_storage_location + "/test_cp_file/test_cp.file"
+        )
+        fs.cp_file(cp_file_file, cp_file_new_file)
+        self.assertTrue(fs.exists(cp_file_new_file))
+
+        with self.hdfs.open(cp_file_new_actual_file, "rb") as f:
+            result = f.read()
+        self.assertEqual(b"test_file_1", result)
+
+    def test_mv(self):
+        mv_dir = self.fileset_gvfs_location + "/test_mv"
+        mv_actual_dir = self.fileset_storage_location + "/test_mv"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(mv_actual_dir)
+        self.assertTrue(self.hdfs.exists(mv_actual_dir))
+        self.assertTrue(fs.exists(mv_dir))
+
+        mv_new_dir = self.fileset_gvfs_location + "/test_mv_new"
+        mv_new_actual_dir = self.fileset_storage_location + "/test_mv_new"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(mv_new_actual_dir)
+        self.assertTrue(self.hdfs.exists(mv_new_actual_dir))
+        self.assertTrue(fs.exists(mv_new_dir))
+
+        mv_file = self.fileset_gvfs_location + "/test_mv/test.file"
+        mv_actual_file = self.fileset_storage_location + "/test_mv/test.file"
+        self.hdfs.touch(mv_actual_file)
+        self.assertTrue(self.hdfs.exists(mv_actual_file))
+        self.assertTrue(fs.exists(mv_file))
+
+        mv_new_file = self.fileset_gvfs_location + "/test_mv_new/test_new.file"
+        mv_new_actual_file = (
+            self.fileset_storage_location + "/test_mv_new/test_new.file"
+        )
+
+        fs.mv(mv_file, mv_new_file)
+        self.assertTrue(fs.exists(mv_new_file))
+        self.assertTrue(self.hdfs.exists(mv_new_actual_file))
+
+    def test_rm(self):
+        rm_dir = self.fileset_gvfs_location + "/test_rm"
+        rm_actual_dir = self.fileset_storage_location + "/test_rm"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(rm_actual_dir)
+        self.assertTrue(self.hdfs.exists(rm_actual_dir))
+        self.assertTrue(fs.exists(rm_dir))
+
+        rm_file = self.fileset_gvfs_location + "/test_rm/test.file"
+        rm_actual_file = self.fileset_storage_location + "/test_rm/test.file"
+        self.hdfs.touch(rm_file)
+        self.assertTrue(self.hdfs.exists(rm_actual_file))
+        self.assertTrue(fs.exists(rm_file))
+
+        # test delete file
+        fs.rm(rm_file)
+        self.assertFalse(fs.exists(rm_file))
+
+        # test delete dir with recursive = false
+        rm_new_file = self.fileset_gvfs_location + "/test_rm/test_new.file"
+        rm_new_actual_file = self.fileset_storage_location + 
"/test_rm/test_new.file"
+        self.hdfs.touch(rm_new_actual_file)
+        self.assertTrue(self.hdfs.exists(rm_new_actual_file))
+        self.assertTrue(fs.exists(rm_new_file))
+        with self.assertRaises(ValueError):
+            fs.rm(rm_dir, recursive=False)
+
+        # test delete dir with recursive = true
+        fs.rm(rm_dir, recursive=True)
+        self.assertFalse(fs.exists(rm_dir))
+
+    def test_rm_file(self):
+        rm_file_dir = self.fileset_gvfs_location + "/test_rm_file"
+        rm_file_actual_dir = self.fileset_storage_location + "/test_rm_file"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(rm_file_actual_dir)
+        self.assertTrue(self.hdfs.exists(rm_file_actual_dir))
+        self.assertTrue(fs.exists(rm_file_dir))
+
+        rm_file_file = self.fileset_gvfs_location + "/test_rm_file/test.file"
+        rm_file_actual_file = self.fileset_storage_location + 
"/test_rm_file/test.file"
+        self.hdfs.touch(rm_file_actual_file)
+        self.assertTrue(self.hdfs.exists(rm_file_actual_file))
+        self.assertTrue(fs.exists(rm_file_file))
+
+        # test delete file
+        fs.rm_file(rm_file_file)
+        self.assertFalse(fs.exists(rm_file_file))
+
+        # test delete dir
+        with self.assertRaises(OSError):
+            fs.rm_file(rm_file_dir)
+
+    def test_rmdir(self):
+        rmdir_dir = self.fileset_gvfs_location + "/test_rmdir"
+        rmdir_actual_dir = self.fileset_storage_location + "/test_rmdir"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(rmdir_actual_dir)
+        self.assertTrue(self.hdfs.exists(rmdir_actual_dir))
+        self.assertTrue(fs.exists(rmdir_dir))
+
+        rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file"
+        rmdir_actual_file = self.fileset_storage_location + 
"/test_rmdir/test.file"
+        self.hdfs.touch(rmdir_actual_file)
+        self.assertTrue(self.hdfs.exists(rmdir_actual_file))
+        self.assertTrue(fs.exists(rmdir_file))
+
+        # test delete file
+        with self.assertRaises(OSError):
+            fs.rmdir(rmdir_file)
+
+        # test delete dir
+        fs.rmdir(rmdir_dir)
+        self.assertFalse(fs.exists(rmdir_dir))
+
+    def test_open(self):
+        open_dir = self.fileset_gvfs_location + "/test_open"
+        open_actual_dir = self.fileset_storage_location + "/test_open"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(open_actual_dir)
+        self.assertTrue(self.hdfs.exists(open_actual_dir))
+        self.assertTrue(fs.exists(open_dir))
+
+        open_file = self.fileset_gvfs_location + "/test_open/test.file"
+        open_actual_file = self.fileset_storage_location + 
"/test_open/test.file"
+        self.hdfs.touch(open_actual_file)
+        self.assertTrue(self.hdfs.exists(open_actual_file))
+        self.assertTrue(fs.exists(open_file))
+
+        # test open and write file
+        with fs.open(open_file, mode="wb") as f:
+            f.write(b"test_open_write")
+        self.assertTrue(fs.info(open_file)["size"] > 0)
+
+        # test open and read file
+        with fs.open(open_file, mode="rb") as f:
+            self.assertEqual(b"test_open_write", f.read())
+
+    def test_mkdir(self):
+        mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+        mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        fs.mkdir(mkdir_dir)
+        self.assertTrue(fs.exists(mkdir_dir))
+        self.assertTrue(self.hdfs.exists(mkdir_actual_dir))
+
+        # test mkdir dir with create_parents = false
+        parent_not_exist_virtual_path = mkdir_dir + "/not_exist/sub_dir"
+        self.assertFalse(fs.exists(parent_not_exist_virtual_path))
+
+        with self.assertRaises(OSError):
+            fs.mkdir(parent_not_exist_virtual_path, create_parents=False)
+
+        # test mkdir dir with create_parents = true
+        parent_not_exist_virtual_path2 = mkdir_dir + "/not_exist/sub_dir"
+        self.assertFalse(fs.exists(parent_not_exist_virtual_path2))
+
+        fs.mkdir(parent_not_exist_virtual_path2, create_parents=True)
+        self.assertTrue(fs.exists(parent_not_exist_virtual_path2))
+
+    def test_makedirs(self):
+        makedirs_dir = self.fileset_gvfs_location + "/test_makedirs"
+        makedirs_actual_dir = self.fileset_storage_location + "/test_makedirs"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        fs.makedirs(makedirs_dir)
+        self.assertTrue(fs.exists(makedirs_dir))
+        self.assertTrue(self.hdfs.exists(makedirs_actual_dir))
+
+        # test mkdir dir not exist
+        parent_not_exist_virtual_path = makedirs_dir + "/not_exist/sub_dir"
+        self.assertFalse(fs.exists(parent_not_exist_virtual_path))
+        fs.makedirs(parent_not_exist_virtual_path)
+        self.assertTrue(fs.exists(parent_not_exist_virtual_path))
+
+    def test_created(self):
+        created_dir = self.fileset_gvfs_location + "/test_created"
+        created_actual_dir = self.fileset_storage_location + "/test_created"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(created_actual_dir)
+        self.assertTrue(self.hdfs.exists(created_actual_dir))
+        self.assertTrue(fs.exists(created_dir))
+
+        with self.assertRaises(GravitinoRuntimeException):
+            fs.created(created_dir)
+
+    def test_modified(self):
+        modified_dir = self.fileset_gvfs_location + "/test_modified"
+        modified_actual_dir = self.fileset_storage_location + "/test_modified"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(modified_actual_dir)
+        self.assertTrue(self.hdfs.exists(modified_actual_dir))
+        self.assertTrue(fs.exists(modified_dir))
+
+        # test mkdir dir which exists
+        self.assertIsNotNone(fs.modified(modified_dir))
+
+    def test_cat_file(self):
+        cat_dir = self.fileset_gvfs_location + "/test_cat"
+        cat_actual_dir = self.fileset_storage_location + "/test_cat"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(cat_actual_dir)
+        self.assertTrue(self.hdfs.exists(cat_actual_dir))
+        self.assertTrue(fs.exists(cat_dir))
+
+        cat_file = self.fileset_gvfs_location + "/test_cat/test.file"
+        cat_actual_file = self.fileset_storage_location + "/test_cat/test.file"
+        self.hdfs.touch(cat_actual_file)
+        self.assertTrue(self.hdfs.exists(cat_actual_file))
+        self.assertTrue(fs.exists(cat_file))
+
+        # test open and write file
+        with fs.open(cat_file, mode="wb") as f:
+            f.write(b"test_cat_file")
+        self.assertTrue(fs.info(cat_file)["size"] > 0)
+
+        # test cat file
+        content = fs.cat_file(cat_file)
+        self.assertEqual(b"test_cat_file", content)
+
+    def test_get_file(self):
+        get_dir = self.fileset_gvfs_location + "/test_get"
+        get_actual_dir = self.fileset_storage_location + "/test_get"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(get_actual_dir)
+        self.assertTrue(self.hdfs.exists(get_actual_dir))
+        self.assertTrue(fs.exists(get_dir))
+
+        get_file = self.fileset_gvfs_location + "/test_get/test.file"
+        get_actual_file = self.fileset_storage_location + "/test_get/test.file"
+        self.hdfs.touch(get_actual_file)
+        self.assertTrue(self.hdfs.exists(get_actual_file))
+        self.assertTrue(fs.exists(get_file))
+
+        # test open and write file
+        with fs.open(get_file, mode="wb") as f:
+            f.write(b"test_get_file")
+        self.assertTrue(fs.info(get_file)["size"] > 0)
+
+        # test get file
+        local_fs = LocalFileSystem()
+        local_dir = "/tmp/test_gvfs_local_file_" + str(randint(1, 10000))
+        local_fs.makedirs(local_dir)
+        local_path = local_dir + "/get_file.txt"
+        local_fs.touch(local_path)
+        self.assertTrue(local_fs.exists(local_path))
+        fs.get_file(get_file, local_path)
+        self.assertEqual(b"test_get_file", local_fs.cat_file(local_path))
+        local_fs.rm(local_dir, recursive=True)
+
+        # test get a file to a remote file
+        remote_path = self.fileset_gvfs_location + "/test_file_2.par"
+        with self.assertRaises(GravitinoRuntimeException):
+            fs.get_file(get_file, remote_path)
+
+    def test_pandas(self):
+        pands_dir = self.fileset_gvfs_location + "/test_pandas"
+        pands_actual_dir = self.fileset_storage_location + "/test_pandas"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(pands_actual_dir)
+        self.assertTrue(self.hdfs.exists(pands_actual_dir))
+        self.assertTrue(fs.exists(pands_dir))
+
+        data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 
19, 18]})
+        # to parquet
+        parquet_file = self.fileset_gvfs_location + "/test_pandas/test.parquet"
+        parquet_actual_file = (
+            self.fileset_storage_location + "/test_pandas/test.parquet"
+        )
+        data.to_parquet(parquet_file, filesystem=fs)
+        self.assertTrue(fs.exists(parquet_file))
+        self.assertTrue(self.hdfs.exists(parquet_actual_file))
+
+        # read parquet
+        ds1 = pandas.read_parquet(path=parquet_file, filesystem=fs)
+        self.assertTrue(data.equals(ds1))
+        storage_options = {
+            "server_uri": "http://localhost:8090";,
+            "metalake_name": self.metalake_name,
+        }
+        # to csv
+        csv_file = self.fileset_gvfs_location + "/test_pandas/test.csv"
+        csv_actual_file = self.fileset_storage_location + 
"/test_pandas/test.csv"
+        data.to_csv(
+            csv_file,
+            index=False,
+            storage_options=storage_options,
+        )
+        self.assertTrue(fs.exists(csv_file))
+        self.assertTrue(self.hdfs.exists(csv_actual_file))
+
+        # read csv
+        ds2 = pandas.read_csv(csv_file, storage_options=storage_options)
+        self.assertTrue(data.equals(ds2))
+
+    def test_pyarrow(self):
+        pyarrow_dir = self.fileset_gvfs_location + "/test_pyarrow"
+        pyarrow_actual_dir = self.fileset_storage_location + "/test_pyarrow"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(pyarrow_actual_dir)
+        self.assertTrue(self.hdfs.exists(pyarrow_actual_dir))
+        self.assertTrue(fs.exists(pyarrow_dir))
+
+        data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 
19, 18]})
+        # to parquet
+        parquet_file = pyarrow_dir + "/test.parquet"
+        data.to_parquet(parquet_file, filesystem=fs)
+        self.assertTrue(fs.exists(parquet_file))
+
+        # read as arrow dataset
+        arrow_dataset = dt.dataset(parquet_file, filesystem=fs)
+        arrow_tb_1 = arrow_dataset.to_table()
+
+        arrow_tb_2 = pa.Table.from_pandas(data)
+        self.assertTrue(arrow_tb_1.equals(arrow_tb_2))
+
+        # read as arrow parquet dataset
+        arrow_tb_3 = pq.read_table(parquet_file, filesystem=fs)
+        self.assertTrue(arrow_tb_3.equals(arrow_tb_2))
+
+    def test_llama_index(self):
+        llama_dir = self.fileset_gvfs_location + "/test_llama"
+        llama_actual_dir = self.fileset_storage_location + "/test_llama"
+        fs = gvfs.GravitinoVirtualFileSystem(
+            server_uri="http://localhost:8090";,
+            metalake_name=self.metalake_name,
+            **self.conf,
+        )
+        self.hdfs.mkdir(llama_actual_dir)
+        self.assertTrue(self.hdfs.exists(llama_actual_dir))
+        self.assertTrue(fs.exists(llama_dir))
+        data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21, 
19, 18]})
+
+        storage_options = {
+            "server_uri": "http://localhost:8090";,
+            "metalake_name": self.metalake_name,
+        }
+        csv_file = llama_dir + "/test.csv"
+        # to csv
+        data.to_csv(
+            csv_file,
+            index=False,
+            storage_options=storage_options,
+        )
+        self.assertTrue(fs.exists(csv_file))
+        another_csv_file = llama_dir + "/sub_dir/test1.csv"
+        data.to_csv(
+            another_csv_file,
+            index=False,
+            storage_options=storage_options,
+        )
+        self.assertTrue(fs.exists(another_csv_file))
+
+        reader = SimpleDirectoryReader(
+            input_dir=llama_dir[len("gvfs://") :],
+            fs=fs,
+            recursive=True,  # recursively searches all subdirectories
+        )
+        documents = reader.load_data()
+        self.assertEqual(len(documents), 2)
+        doc_1 = documents[0]
+        result_1 = [line.strip().split(", ") for line in 
doc_1.text.split("\n")]
+        self.assertEqual(4, len(result_1))
+        for row in result_1:
+            if row[0] == "A":
+                self.assertEqual(row[1], "20")
+            elif row[0] == "B":
+                self.assertEqual(row[1], "21")
+            elif row[0] == "C":
+                self.assertEqual(row[1], "19")
+            elif row[0] == "D":
+                self.assertEqual(row[1], "18")
diff --git a/clients/client-python/tests/integration/test_simple_auth_client.py 
b/clients/client-python/tests/integration/test_simple_auth_client.py
index 516db2add..a4ed77fe1 100644
--- a/clients/client-python/tests/integration/test_simple_auth_client.py
+++ b/clients/client-python/tests/integration/test_simple_auth_client.py
@@ -19,7 +19,6 @@ under the License.
 
 import logging
 import os
-import unittest
 from random import randint
 from typing import Dict
 
@@ -31,11 +30,12 @@ from gravitino import (
     Fileset,
 )
 from gravitino.auth.simple_auth_provider import SimpleAuthProvider
+from tests.integration.integration_test_env import IntegrationTestEnv
 
 logger = logging.getLogger(__name__)
 
 
-class TestSimpleAuthClient(unittest.TestCase):
+class TestSimpleAuthClient(IntegrationTestEnv):
     creator: str = "test_client"
     metalake_name: str = "TestClient_metalake" + str(randint(1, 10000))
     catalog_name: str = "fileset_catalog"

Reply via email to