This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 4312b632b [#3760] improvement(client-python): Add Docker env and
PyGVFS Integration tests (#3876)
4312b632b is described below
commit 4312b632b225b5a5a2ea6e0f0bdcbdd3092a1db8
Author: xloya <[email protected]>
AuthorDate: Fri Jul 5 11:26:27 2024 +0800
[#3760] improvement(client-python): Add Docker env and PyGVFS Integration
tests (#3876)
### What changes were proposed in this pull request?
Add Hive Docker env for client-python, and add integration tests for
PyGVFS + HDFS. Depends on #3528.
### Why are the changes needed?
Fix: #3760
### How was this patch tested?
Add some ITs.
---------
Co-authored-by: xiaojiebao <[email protected]>
---
.github/workflows/python-integration-test.yml | 2 +-
clients/client-python/build.gradle.kts | 55 +-
clients/client-python/requirements-dev.txt | 3 +-
.../tests/integration/base_hadoop_env.py | 101 +++
.../tests/integration/hdfs_container.py | 158 +++++
.../tests/integration/integration_test_env.py | 86 +++
.../tests/integration/test_gvfs_with_hdfs.py | 704 +++++++++++++++++++++
.../tests/integration/test_simple_auth_client.py | 4 +-
8 files changed, 1098 insertions(+), 15 deletions(-)
diff --git a/.github/workflows/python-integration-test.yml
b/.github/workflows/python-integration-test.yml
index f2e5fd4ed..a7ffacfd7 100644
--- a/.github/workflows/python-integration-test.yml
+++ b/.github/workflows/python-integration-test.yml
@@ -66,7 +66,7 @@ jobs:
for pythonVersion in "3.8" "3.9" "3.10" "3.11"
do
echo "Use Python version ${pythonVersion} to test the Python
client."
- ./gradlew -PjdkVersion=${{ matrix.java-version }}
-PpythonVersion=${pythonVersion} :clients:client-python:test
+ ./gradlew -PjdkVersion=${{ matrix.java-version }}
-PpythonVersion=${pythonVersion} -PskipDockerTests=false
:clients:client-python:test
# Clean Gravitino database to clean test data
rm -rf ./distribution/package/data
done
diff --git a/clients/client-python/build.gradle.kts
b/clients/client-python/build.gradle.kts
index 68cc897e5..2cf83c376 100644
--- a/clients/client-python/build.gradle.kts
+++ b/clients/client-python/build.gradle.kts
@@ -16,12 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
+import de.undercouch.gradle.tasks.download.Download
+import de.undercouch.gradle.tasks.download.Verify
import io.github.piyushroshan.python.VenvTask
import java.net.HttpURLConnection
import java.net.URL
plugins {
id("io.github.piyushroshan.python-gradle-miniforge-plugin") version "1.0.0"
+ id("de.undercouch.download") version "5.6.0"
}
pythonPlugin {
@@ -148,6 +151,10 @@ fun generatePypiProjectHomePage() {
}
}
+val hadoopVersion = "2.7.3"
+val hadoopPackName = "hadoop-${hadoopVersion}.tar.gz"
+val hadoopDirName = "hadoop-${hadoopVersion}"
+val hadoopDownloadUrl =
"https://archive.apache.org/dist/hadoop/core/hadoop-${hadoopVersion}/${hadoopPackName}"
tasks {
val pipInstall by registering(VenvTask::class) {
venvExec = "pip"
@@ -173,6 +180,26 @@ tasks {
workingDir = projectDir.resolve("./tests/integration")
}
+ val build by registering(VenvTask::class) {
+ dependsOn(pylint)
+ venvExec = "python"
+ args = listOf("scripts/generate_version.py")
+ }
+
+ val downloadHadoopPack by registering(Download::class) {
+ dependsOn(build)
+ onlyIfModified(true)
+ src(hadoopDownloadUrl)
+ dest(layout.buildDirectory.dir("tmp"))
+ }
+
+ val verifyHadoopPack by registering(Verify::class) {
+ dependsOn(downloadHadoopPack)
+ src(layout.buildDirectory.file("tmp/${hadoopPackName}"))
+ algorithm("MD5")
+ checksum("3455bb57e4b4906bbea67b58cca78fa8")
+ }
+
val integrationTest by registering(VenvTask::class) {
doFirst {
gravitinoServer("start")
@@ -181,11 +208,23 @@ tasks {
venvExec = "coverage"
args = listOf("run", "--branch", "-m", "unittest")
workingDir = projectDir.resolve("./tests/integration")
- environment = mapOf(
- "PROJECT_VERSION" to project.version,
- "GRAVITINO_HOME" to project.rootDir.path + "/distribution/package",
- "START_EXTERNAL_GRAVITINO" to "true"
- )
+ val dockerTest = project.rootProject.extra["dockerTest"] as? Boolean ?:
false
+ val envMap = mapOf<String, Any>().toMutableMap()
+ if (dockerTest) {
+ dependsOn("verifyHadoopPack")
+ envMap.putAll(mapOf(
+ "HADOOP_VERSION" to hadoopVersion,
+ "PYTHON_BUILD_PATH" to project.rootDir.path +
"/clients/client-python/build"
+ ))
+ }
+ envMap.putAll(mapOf(
+ "PROJECT_VERSION" to project.version,
+ "GRAVITINO_HOME" to project.rootDir.path + "/distribution/package",
+ "START_EXTERNAL_GRAVITINO" to "true",
+ "DOCKER_TEST" to dockerTest.toString(),
+ "GRAVITINO_CI_HIVE_DOCKER_IMAGE" to
"datastrato/gravitino-ci-hive:0.1.12",
+ ))
+ environment = envMap
doLast {
gravitinoServer("stop")
@@ -224,12 +263,6 @@ tasks {
}
}
- val build by registering(VenvTask::class) {
- dependsOn(pylint)
- venvExec = "python"
- args = listOf("scripts/generate_version.py")
- }
-
val pydoc by registering(VenvTask::class) {
venvExec = "python"
args = listOf("scripts/generate_doc.py")
diff --git a/clients/client-python/requirements-dev.txt
b/clients/client-python/requirements-dev.txt
index 77387c01c..06f634358 100644
--- a/clients/client-python/requirements-dev.txt
+++ b/clients/client-python/requirements-dev.txt
@@ -26,4 +26,5 @@ pyarrow==15.0.2
llama-index==0.10.40
tenacity==8.3.0
cachetools==5.3.3
-readerwriterlock==1.0.9
\ No newline at end of file
+readerwriterlock==1.0.9
+docker==7.1.0
\ No newline at end of file
diff --git a/clients/client-python/tests/integration/base_hadoop_env.py
b/clients/client-python/tests/integration/base_hadoop_env.py
new file mode 100644
index 000000000..694331078
--- /dev/null
+++ b/clients/client-python/tests/integration/base_hadoop_env.py
@@ -0,0 +1,101 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+
+import logging
+import os
+import shutil
+import subprocess
+import tarfile
+
+from gravitino.exceptions.gravitino_runtime_exception import
GravitinoRuntimeException
+
+logger = logging.getLogger(__name__)
+
+HADOOP_VERSION = os.environ.get("HADOOP_VERSION")
+PYTHON_BUILD_PATH = os.environ.get("PYTHON_BUILD_PATH")
+
+
+class BaseHadoopEnvironment:
+
+ @classmethod
+ def init_hadoop_env(cls):
+ cls._unzip_hadoop_pack()
+ # configure hadoop env
+ cls._configure_hadoop_environment()
+
+ @classmethod
+ def clear_hadoop_env(cls):
+ try:
+ shutil.rmtree(f"{PYTHON_BUILD_PATH}/hadoop")
+ except Exception as e:
+ raise GravitinoRuntimeException(
+ f"Failed to delete dir '{PYTHON_BUILD_PATH}/hadoop': {e}"
+ ) from e
+
+ @classmethod
+ def _unzip_hadoop_pack(cls):
+ hadoop_pack = f"{PYTHON_BUILD_PATH}/tmp/hadoop-{HADOOP_VERSION}.tar.gz"
+ unzip_dir = f"{PYTHON_BUILD_PATH}/hadoop"
+ logger.info("Unzip hadoop pack from %s.", hadoop_pack)
+ # unzip the pack
+ if os.path.exists(unzip_dir):
+ try:
+ shutil.rmtree(unzip_dir)
+ except Exception as e:
+ raise GravitinoRuntimeException(
+ f"Failed to delete dir '{unzip_dir}': {e}"
+ ) from e
+ try:
+ with tarfile.open(hadoop_pack) as tar:
+ tar.extractall(path=unzip_dir)
+ except Exception as e:
+ raise GravitinoRuntimeException(
+ f"Failed to extract file '{hadoop_pack}': {e}"
+ ) from e
+
+ @classmethod
+ def _configure_hadoop_environment(cls):
+ logger.info("Configure hadoop environment.")
+ os.putenv("HADOOP_USER_NAME", "datastrato")
+ os.putenv("HADOOP_HOME",
f"{PYTHON_BUILD_PATH}/hadoop/hadoop-{HADOOP_VERSION}")
+ os.putenv(
+ "HADOOP_CONF_DIR",
+ f"{PYTHON_BUILD_PATH}/hadoop/hadoop-{HADOOP_VERSION}/etc/hadoop",
+ )
+ hadoop_shell_path = (
+ f"{PYTHON_BUILD_PATH}/hadoop/hadoop-{HADOOP_VERSION}/bin/hadoop"
+ )
+ # get the classpath
+ try:
+ result = subprocess.run(
+ [hadoop_shell_path, "classpath", "--glob"],
+ capture_output=True,
+ text=True,
+ check=True,
+ )
+ if result.returncode == 0:
+ os.putenv("CLASSPATH", str(result.stdout))
+ else:
+ raise GravitinoRuntimeException(
+ f"Command failed with return code is not 0, stdout:
{result.stdout}, stderr:{result.stderr}"
+ )
+ except subprocess.CalledProcessError as e:
+ raise GravitinoRuntimeException(
+ f"Command failed with return code {e.returncode},
stderr:{e.stderr}"
+ ) from e
diff --git a/clients/client-python/tests/integration/hdfs_container.py
b/clients/client-python/tests/integration/hdfs_container.py
new file mode 100644
index 000000000..34bc41d86
--- /dev/null
+++ b/clients/client-python/tests/integration/hdfs_container.py
@@ -0,0 +1,158 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+
+import asyncio
+import logging
+import os
+import time
+
+import docker
+from docker import types as tp
+from docker.errors import NotFound
+
+from gravitino.exceptions.gravitino_runtime_exception import
GravitinoRuntimeException
+
+logger = logging.getLogger(__name__)
+
+
+async def check_hdfs_status(hdfs_container):
+ retry_limit = 15
+ for _ in range(retry_limit):
+ try:
+ command_and_args = ["bash", "/tmp/check-status.sh"]
+ exec_result = hdfs_container.exec_run(command_and_args)
+ if exec_result.exit_code != 0:
+ message = (
+ f"Command {command_and_args} exited with
{exec_result.exit_code}"
+ )
+ logger.warning(message)
+ logger.warning("output: %s", exec_result.output)
+ output_status_command = ["hdfs", "dfsadmin", "-report"]
+ exec_result = hdfs_container.exec_run(output_status_command)
+ logger.info("HDFS report, output: %s", exec_result.output)
+ else:
+ logger.info("HDFS startup successfully!")
+ return True
+ except Exception as e:
+ logger.error(
+ "Exception occurred while checking HDFS container status: %s",
e
+ )
+ time.sleep(10)
+ return False
+
+
+async def check_hdfs_container_status(hdfs_container):
+ timeout_sec = 150
+ try:
+ result = await asyncio.wait_for(
+ check_hdfs_status(hdfs_container), timeout=timeout_sec
+ )
+ assert result is True, "HDFS container startup failed!"
+ except asyncio.TimeoutError as e:
+ raise GravitinoRuntimeException(
+ "Timeout occurred while waiting for checking HDFS container
status."
+ ) from e
+
+
+class HDFSContainer:
+ _docker_client = None
+ _container = None
+ _network = None
+ _ip = ""
+ _network_name = "python-net"
+ _container_name = "python-hdfs"
+
+ def __init__(self):
+ self._docker_client = docker.from_env()
+ self._create_networks()
+ try:
+ container =
self._docker_client.containers.get(self._container_name)
+ if container is not None:
+ if container.status != "running":
+ container.restart()
+ self._container = container
+ except NotFound:
+ logger.warning("Cannot find hdfs container in docker env, skip
remove.")
+ if self._container is None:
+ image_name = os.environ.get("GRAVITINO_CI_HIVE_DOCKER_IMAGE")
+ if image_name is None:
+ raise GravitinoRuntimeException(
+ "GRAVITINO_CI_HIVE_DOCKER_IMAGE env variable is not set."
+ )
+ self._container = self._docker_client.containers.run(
+ image=image_name,
+ name=self._container_name,
+ detach=True,
+ environment={"HADOOP_USER_NAME": "datastrato"},
+ network=self._network_name,
+ )
+ asyncio.run(check_hdfs_container_status(self._container))
+
+ self._fetch_ip()
+
+ def _create_networks(self):
+ pool_config = tp.IPAMPool(subnet="10.20.31.16/28")
+ ipam_config = tp.IPAMConfig(driver="default",
pool_configs=[pool_config])
+ networks = self._docker_client.networks.list()
+ for network in networks:
+ if network.name == self._network_name:
+ self._network = network
+ break
+ if self._network is None:
+ self._network = self._docker_client.networks.create(
+ name=self._network_name, driver="bridge", ipam=ipam_config
+ )
+
+ def _fetch_ip(self):
+ if self._container is None:
+ raise GravitinoRuntimeException("The HDFS container has not init.")
+
+ container_info =
self._docker_client.api.inspect_container(self._container.id)
+ self._ip =
container_info["NetworkSettings"]["Networks"][self._network_name][
+ "IPAddress"
+ ]
+
+ def get_ip(self):
+ return self._ip
+
+ def close(self):
+ try:
+ self._container.kill()
+ except RuntimeError as e:
+ logger.warning(
+ "Exception occurred while killing container %s : %s",
+ self._container_name,
+ e,
+ )
+ try:
+ self._container.remove()
+ except RuntimeError as e:
+ logger.warning(
+ "Exception occurred while removing container %s : %s",
+ self._container_name,
+ e,
+ )
+ try:
+ self._network.remove()
+ except RuntimeError as e:
+ logger.warning(
+ "Exception occurred while removing network %s : %s",
+ self._network_name,
+ e,
+ )
diff --git a/clients/client-python/tests/integration/integration_test_env.py
b/clients/client-python/tests/integration/integration_test_env.py
index a34bba23d..dde2c2410 100644
--- a/clients/client-python/tests/integration/integration_test_env.py
+++ b/clients/client-python/tests/integration/integration_test_env.py
@@ -26,6 +26,8 @@ import sys
import requests
+from gravitino.exceptions.gravitino_runtime_exception import
GravitinoRuntimeException
+
logger = logging.getLogger(__name__)
@@ -132,3 +134,87 @@ class IntegrationTestEnv(unittest.TestCase):
if gravitino_server_running:
logger.error("Can't stop Gravitino server!")
+
+ @classmethod
+ def restart_server(cls):
+ logger.info("Restarting Gravitino server...")
+ gravitino_home = os.environ.get("GRAVITINO_HOME")
+ gravitino_startup_script = os.path.join(gravitino_home,
"bin/gravitino.sh")
+ if not os.path.exists(gravitino_startup_script):
+ raise GravitinoRuntimeException(
+ f"Can't find Gravitino startup script:
{gravitino_startup_script}, "
+ "Please execute `./gradlew compileDistribution -x test` in the
Gravitino "
+ "project root directory."
+ )
+
+ # Restart Gravitino Server
+ env_vars = os.environ.copy()
+ env_vars["HADOOP_USER_NAME"] = "datastrato"
+ result = subprocess.run(
+ [gravitino_startup_script, "restart"],
+ env=env_vars,
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ if result.stdout:
+ logger.info("stdout: %s", result.stdout)
+ if result.stderr:
+ logger.info("stderr: %s", result.stderr)
+
+ if not check_gravitino_server_status():
+ raise GravitinoRuntimeException("ERROR: Can't start Gravitino
server!")
+
+ @classmethod
+ def _append_catalog_hadoop_conf(cls, config):
+ logger.info("Append catalog hadoop conf.")
+ gravitino_home = os.environ.get("GRAVITINO_HOME")
+ if gravitino_home is None:
+ raise GravitinoRuntimeException("Cannot find GRAVITINO_HOME env.")
+ hadoop_conf_path = f"{gravitino_home}/catalogs/hadoop/conf/hadoop.conf"
+ if not os.path.exists(hadoop_conf_path):
+ raise GravitinoRuntimeException(
+ f"Hadoop conf file is not found at `{hadoop_conf_path}`."
+ )
+
+ with open(hadoop_conf_path, mode="a", encoding="utf-8") as f:
+ for key, value in config.items():
+ f.write(f"\n{key} = {value}")
+
+ @classmethod
+ def _reset_catalog_hadoop_conf(cls, config):
+ logger.info("Reset catalog hadoop conf.")
+ gravitino_home = os.environ.get("GRAVITINO_HOME")
+ if gravitino_home is None:
+ raise GravitinoRuntimeException("Cannot find GRAVITINO_HOME env.")
+ hadoop_conf_path = f"{gravitino_home}/catalogs/hadoop/conf/hadoop.conf"
+ if not os.path.exists(hadoop_conf_path):
+ raise GravitinoRuntimeException(
+ f"Hadoop conf file is not found at `{hadoop_conf_path}`."
+ )
+ filtered_lines = []
+ with open(hadoop_conf_path, mode="r", encoding="utf-8") as file:
+ origin_lines = file.readlines()
+
+ existed_config = {}
+ for line in origin_lines:
+ line = line.strip()
+ if line.startswith("#"):
+ # append annotations directly
+ filtered_lines.append(line + "\n")
+ else:
+ try:
+ key, value = line.split("=")
+ existed_config[key.strip()] = value.strip()
+ except ValueError:
+ # cannot split to key, value, so just append
+ filtered_lines.append(line + "\n")
+
+ for key, value in existed_config.items():
+ if config[key] is None:
+ append_line = f"{key} = {value}\n"
+ filtered_lines.append(append_line)
+
+ with open(hadoop_conf_path, mode="w", encoding="utf-8") as file:
+ for line in filtered_lines:
+ file.write(line)
diff --git a/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
new file mode 100644
index 000000000..442bbc87e
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_hdfs.py
@@ -0,0 +1,704 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+"""
+
+import logging
+import os
+import platform
+import unittest
+from random import randint
+from typing import Dict
+
+import pandas
+import pyarrow as pa
+import pyarrow.dataset as dt
+import pyarrow.parquet as pq
+from fsspec.implementations.local import LocalFileSystem
+from fsspec.implementations.arrow import ArrowFSWrapper
+from llama_index.core import SimpleDirectoryReader
+from pyarrow.fs import HadoopFileSystem
+from gravitino import (
+ gvfs,
+ NameIdentifier,
+ GravitinoAdminClient,
+ GravitinoClient,
+ Catalog,
+ Fileset,
+)
+from gravitino.exceptions.gravitino_runtime_exception import
GravitinoRuntimeException
+from tests.integration.integration_test_env import IntegrationTestEnv
+from tests.integration.hdfs_container import HDFSContainer
+from tests.integration.base_hadoop_env import BaseHadoopEnvironment
+
+logger = logging.getLogger(__name__)
+
+DOCKER_TEST = os.environ.get("DOCKER_TEST")
+
+
+# The Hadoop distribution package does not have native hdfs libraries for
macOS / Windows systems
+# (`libhdfs.dylib` for macOS and `libhdfs.dll` for Windows), so the
integration tests cannot be run
+# on these two systems at present.
[email protected](
+ DOCKER_TEST == "false" or platform.system() != "Linux",
+ "Skipping tests on non-Linux systems or when DOCKER_TEST=false",
+)
+class TestGvfsWithHDFS(IntegrationTestEnv):
+ hdfs_container: HDFSContainer = None
+ config: Dict = None
+ metalake_name: str = "TestGvfsWithHDFS_metalake" + str(randint(1, 10000))
+ catalog_name: str = "test_gvfs_catalog" + str(randint(1, 10000))
+ catalog_provider: str = "hadoop"
+
+ schema_name: str = "test_gvfs_schema"
+
+ fileset_name: str = "test_gvfs_fileset"
+ fileset_comment: str = "fileset_comment"
+ fileset_storage_location: str = ""
+ fileset_properties_key1: str = "fileset_properties_key1"
+ fileset_properties_value1: str = "fileset_properties_value1"
+ fileset_properties_key2: str = "fileset_properties_key2"
+ fileset_properties_value2: str = "fileset_properties_value2"
+ fileset_properties: Dict[str, str] = {
+ fileset_properties_key1: fileset_properties_value1,
+ fileset_properties_key2: fileset_properties_value2,
+ }
+
+ schema_ident: NameIdentifier = NameIdentifier.of(
+ metalake_name, catalog_name, schema_name
+ )
+ fileset_ident: NameIdentifier = NameIdentifier.of(schema_name,
fileset_name)
+
+ gravitino_admin_client: GravitinoAdminClient = GravitinoAdminClient(
+ uri="http://localhost:8090"
+ )
+ gravitino_client: GravitinoClient = None
+
+ @classmethod
+ def setUpClass(cls):
+ cls.hdfs_container = HDFSContainer()
+ hdfs_container_ip = cls.hdfs_container.get_ip()
+ # init hadoop env
+ BaseHadoopEnvironment.init_hadoop_env()
+ cls.config = {
+ "gravitino.bypass.fs.defaultFS": f"hdfs://{hdfs_container_ip}:9000"
+ }
+ # append the hadoop conf to server
+ cls._append_catalog_hadoop_conf(cls.config)
+ # restart the server
+ cls.restart_server()
+ # create entity
+ cls._init_test_entities()
+
+ @classmethod
+ def tearDownClass(cls):
+ try:
+ cls._clean_test_data()
+ # reset server conf
+ cls._reset_catalog_hadoop_conf(cls.config)
+ # restart server
+ cls.restart_server()
+ # clear hadoop env
+ BaseHadoopEnvironment.clear_hadoop_env()
+ finally:
+ # close hdfs container
+ cls.hdfs_container.close()
+
+ @classmethod
+ def _init_test_entities(cls):
+ cls.gravitino_admin_client.create_metalake(
+ name=cls.metalake_name, comment="", properties={}
+ )
+ cls.gravitino_client = GravitinoClient(
+ uri="http://localhost:8090", metalake_name=cls.metalake_name
+ )
+ catalog = cls.gravitino_client.create_catalog(
+ name=cls.catalog_name,
+ catalog_type=Catalog.Type.FILESET,
+ provider=cls.catalog_provider,
+ comment="",
+ properties={},
+ )
+ catalog.as_schemas().create_schema(
+ schema_name=cls.schema_name, comment="", properties={}
+ )
+
+ cls.fileset_storage_location: str = (
+
f"hdfs://{cls.hdfs_container.get_ip()}:9000/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ cls.fileset_gvfs_location = (
+
f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ catalog.as_fileset_catalog().create_fileset(
+ ident=cls.fileset_ident,
+ fileset_type=Fileset.Type.MANAGED,
+ comment=cls.fileset_comment,
+ storage_location=cls.fileset_storage_location,
+ properties=cls.fileset_properties,
+ )
+ arrow_hadoop_fs = HadoopFileSystem(host=cls.hdfs_container.get_ip(),
port=9000)
+ cls.hdfs = ArrowFSWrapper(arrow_hadoop_fs)
+ cls.conf: Dict = {"fs.defaultFS":
f"hdfs://{cls.hdfs_container.get_ip()}:9000/"}
+
+ @classmethod
+ def _clean_test_data(cls):
+ try:
+ cls.gravitino_client = GravitinoClient(
+ uri="http://localhost:8090", metalake_name=cls.metalake_name
+ )
+ catalog = cls.gravitino_client.load_catalog(name=cls.catalog_name)
+ logger.info(
+ "Drop fileset %s[%s]",
+ cls.fileset_ident,
+
catalog.as_fileset_catalog().drop_fileset(ident=cls.fileset_ident),
+ )
+ logger.info(
+ "Drop schema %s[%s]",
+ cls.schema_ident,
+ catalog.as_schemas().drop_schema(
+ schema_name=cls.schema_name, cascade=True
+ ),
+ )
+ logger.info(
+ "Drop catalog %s[%s]",
+ cls.catalog_name,
+ cls.gravitino_client.drop_catalog(name=cls.catalog_name),
+ )
+ logger.info(
+ "Drop metalake %s[%s]",
+ cls.metalake_name,
+ cls.gravitino_admin_client.drop_metalake(cls.metalake_name),
+ )
+ except Exception as e:
+ logger.error("Clean test data failed: %s", e)
+
+ def test_ls(self):
+ ls_dir = self.fileset_gvfs_location + "/test_ls"
+ ls_actual_dir = self.fileset_storage_location + "/test_ls"
+
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(ls_actual_dir)
+ self.assertTrue(self.hdfs.exists(ls_actual_dir))
+
+ ls_file = self.fileset_gvfs_location + "/test_ls/test.file"
+ ls_actual_file = self.fileset_storage_location + "/test_ls/test.file"
+ self.hdfs.touch(ls_actual_file)
+ self.assertTrue(self.hdfs.exists(ls_actual_file))
+
+ # test detail = false
+ file_list_without_detail = fs.ls(ls_dir, detail=False)
+ self.assertEqual(1, len(file_list_without_detail))
+ self.assertEqual(file_list_without_detail[0], ls_file[len("gvfs://")
:])
+
+ # test detail = true
+ file_list_with_detail = fs.ls(ls_dir, detail=True)
+ self.assertEqual(1, len(file_list_with_detail))
+ self.assertEqual(file_list_with_detail[0]["name"],
ls_file[len("gvfs://") :])
+
+ def test_info(self):
+ info_dir = self.fileset_gvfs_location + "/test_info"
+ info_actual_dir = self.fileset_storage_location + "/test_info"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(info_actual_dir)
+ self.assertTrue(self.hdfs.exists(info_actual_dir))
+
+ info_file = self.fileset_gvfs_location + "/test_info/test.file"
+ info_actual_file = self.fileset_storage_location +
"/test_info/test.file"
+ self.hdfs.touch(info_actual_file)
+ self.assertTrue(self.hdfs.exists(info_actual_file))
+
+ dir_info = fs.info(info_dir)
+ self.assertEqual(dir_info["name"], info_dir[len("gvfs://") :])
+
+ file_info = fs.info(info_file)
+ self.assertEqual(file_info["name"], info_file[len("gvfs://") :])
+
+ def test_exist(self):
+ exist_dir = self.fileset_gvfs_location + "/test_exist"
+ exist_actual_dir = self.fileset_storage_location + "/test_exist"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(exist_actual_dir)
+ self.assertTrue(self.hdfs.exists(exist_actual_dir))
+ self.assertTrue(fs.exists(exist_dir))
+
+ exist_file = self.fileset_gvfs_location + "/test_exist/test.file"
+ exist_actual_file = self.fileset_storage_location +
"/test_exist/test.file"
+ self.hdfs.touch(exist_actual_file)
+ self.assertTrue(self.hdfs.exists(exist_actual_file))
+ self.assertTrue(fs.exists(exist_file))
+
+ def test_cp_file(self):
+ cp_file_dir = self.fileset_gvfs_location + "/test_cp_file"
+ cp_file_actual_dir = self.fileset_storage_location + "/test_cp_file"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(cp_file_actual_dir)
+ self.assertTrue(self.hdfs.exists(cp_file_actual_dir))
+ self.assertTrue(fs.exists(cp_file_dir))
+
+ cp_file_file = self.fileset_gvfs_location + "/test_cp_file/test.file"
+ cp_file_actual_file = self.fileset_storage_location +
"/test_cp_file/test.file"
+ self.hdfs.touch(cp_file_actual_file)
+ self.assertTrue(self.hdfs.exists(cp_file_actual_file))
+ self.assertTrue(fs.exists(cp_file_file))
+
+ with self.hdfs.open(cp_file_actual_file, "wb") as f:
+ f.write(b"test_file_1")
+
+ cp_file_new_file = self.fileset_gvfs_location +
"/test_cp_file/test_cp.file"
+ cp_file_new_actual_file = (
+ self.fileset_storage_location + "/test_cp_file/test_cp.file"
+ )
+ fs.cp_file(cp_file_file, cp_file_new_file)
+ self.assertTrue(fs.exists(cp_file_new_file))
+
+ with self.hdfs.open(cp_file_new_actual_file, "rb") as f:
+ result = f.read()
+ self.assertEqual(b"test_file_1", result)
+
+ def test_mv(self):
+ mv_dir = self.fileset_gvfs_location + "/test_mv"
+ mv_actual_dir = self.fileset_storage_location + "/test_mv"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(mv_actual_dir)
+ self.assertTrue(self.hdfs.exists(mv_actual_dir))
+ self.assertTrue(fs.exists(mv_dir))
+
+ mv_new_dir = self.fileset_gvfs_location + "/test_mv_new"
+ mv_new_actual_dir = self.fileset_storage_location + "/test_mv_new"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(mv_new_actual_dir)
+ self.assertTrue(self.hdfs.exists(mv_new_actual_dir))
+ self.assertTrue(fs.exists(mv_new_dir))
+
+ mv_file = self.fileset_gvfs_location + "/test_mv/test.file"
+ mv_actual_file = self.fileset_storage_location + "/test_mv/test.file"
+ self.hdfs.touch(mv_actual_file)
+ self.assertTrue(self.hdfs.exists(mv_actual_file))
+ self.assertTrue(fs.exists(mv_file))
+
+ mv_new_file = self.fileset_gvfs_location + "/test_mv_new/test_new.file"
+ mv_new_actual_file = (
+ self.fileset_storage_location + "/test_mv_new/test_new.file"
+ )
+
+ fs.mv(mv_file, mv_new_file)
+ self.assertTrue(fs.exists(mv_new_file))
+ self.assertTrue(self.hdfs.exists(mv_new_actual_file))
+
+ def test_rm(self):
+ rm_dir = self.fileset_gvfs_location + "/test_rm"
+ rm_actual_dir = self.fileset_storage_location + "/test_rm"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(rm_actual_dir)
+ self.assertTrue(self.hdfs.exists(rm_actual_dir))
+ self.assertTrue(fs.exists(rm_dir))
+
+ rm_file = self.fileset_gvfs_location + "/test_rm/test.file"
+ rm_actual_file = self.fileset_storage_location + "/test_rm/test.file"
+ self.hdfs.touch(rm_file)
+ self.assertTrue(self.hdfs.exists(rm_actual_file))
+ self.assertTrue(fs.exists(rm_file))
+
+ # test delete file
+ fs.rm(rm_file)
+ self.assertFalse(fs.exists(rm_file))
+
+ # test delete dir with recursive = false
+ rm_new_file = self.fileset_gvfs_location + "/test_rm/test_new.file"
+ rm_new_actual_file = self.fileset_storage_location +
"/test_rm/test_new.file"
+ self.hdfs.touch(rm_new_actual_file)
+ self.assertTrue(self.hdfs.exists(rm_new_actual_file))
+ self.assertTrue(fs.exists(rm_new_file))
+ with self.assertRaises(ValueError):
+ fs.rm(rm_dir, recursive=False)
+
+ # test delete dir with recursive = true
+ fs.rm(rm_dir, recursive=True)
+ self.assertFalse(fs.exists(rm_dir))
+
+ def test_rm_file(self):
+ rm_file_dir = self.fileset_gvfs_location + "/test_rm_file"
+ rm_file_actual_dir = self.fileset_storage_location + "/test_rm_file"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(rm_file_actual_dir)
+ self.assertTrue(self.hdfs.exists(rm_file_actual_dir))
+ self.assertTrue(fs.exists(rm_file_dir))
+
+ rm_file_file = self.fileset_gvfs_location + "/test_rm_file/test.file"
+ rm_file_actual_file = self.fileset_storage_location +
"/test_rm_file/test.file"
+ self.hdfs.touch(rm_file_actual_file)
+ self.assertTrue(self.hdfs.exists(rm_file_actual_file))
+ self.assertTrue(fs.exists(rm_file_file))
+
+ # test delete file
+ fs.rm_file(rm_file_file)
+ self.assertFalse(fs.exists(rm_file_file))
+
+ # test delete dir
+ with self.assertRaises(OSError):
+ fs.rm_file(rm_file_dir)
+
+ def test_rmdir(self):
+ rmdir_dir = self.fileset_gvfs_location + "/test_rmdir"
+ rmdir_actual_dir = self.fileset_storage_location + "/test_rmdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(rmdir_actual_dir)
+ self.assertTrue(self.hdfs.exists(rmdir_actual_dir))
+ self.assertTrue(fs.exists(rmdir_dir))
+
+ rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file"
+ rmdir_actual_file = self.fileset_storage_location +
"/test_rmdir/test.file"
+ self.hdfs.touch(rmdir_actual_file)
+ self.assertTrue(self.hdfs.exists(rmdir_actual_file))
+ self.assertTrue(fs.exists(rmdir_file))
+
+ # test delete file
+ with self.assertRaises(OSError):
+ fs.rmdir(rmdir_file)
+
+ # test delete dir
+ fs.rmdir(rmdir_dir)
+ self.assertFalse(fs.exists(rmdir_dir))
+
+ def test_open(self):
+ open_dir = self.fileset_gvfs_location + "/test_open"
+ open_actual_dir = self.fileset_storage_location + "/test_open"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(open_actual_dir)
+ self.assertTrue(self.hdfs.exists(open_actual_dir))
+ self.assertTrue(fs.exists(open_dir))
+
+ open_file = self.fileset_gvfs_location + "/test_open/test.file"
+ open_actual_file = self.fileset_storage_location +
"/test_open/test.file"
+ self.hdfs.touch(open_actual_file)
+ self.assertTrue(self.hdfs.exists(open_actual_file))
+ self.assertTrue(fs.exists(open_file))
+
+ # test open and write file
+ with fs.open(open_file, mode="wb") as f:
+ f.write(b"test_open_write")
+ self.assertTrue(fs.info(open_file)["size"] > 0)
+
+ # test open and read file
+ with fs.open(open_file, mode="rb") as f:
+ self.assertEqual(b"test_open_write", f.read())
+
+ def test_mkdir(self):
+ mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+ mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ fs.mkdir(mkdir_dir)
+ self.assertTrue(fs.exists(mkdir_dir))
+ self.assertTrue(self.hdfs.exists(mkdir_actual_dir))
+
+ # test mkdir dir with create_parents = false
+ parent_not_exist_virtual_path = mkdir_dir + "/not_exist/sub_dir"
+ self.assertFalse(fs.exists(parent_not_exist_virtual_path))
+
+ with self.assertRaises(OSError):
+ fs.mkdir(parent_not_exist_virtual_path, create_parents=False)
+
+ # test mkdir dir with create_parents = true
+ parent_not_exist_virtual_path2 = mkdir_dir + "/not_exist/sub_dir"
+ self.assertFalse(fs.exists(parent_not_exist_virtual_path2))
+
+ fs.mkdir(parent_not_exist_virtual_path2, create_parents=True)
+ self.assertTrue(fs.exists(parent_not_exist_virtual_path2))
+
+ def test_makedirs(self):
+ makedirs_dir = self.fileset_gvfs_location + "/test_makedirs"
+ makedirs_actual_dir = self.fileset_storage_location + "/test_makedirs"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ fs.makedirs(makedirs_dir)
+ self.assertTrue(fs.exists(makedirs_dir))
+ self.assertTrue(self.hdfs.exists(makedirs_actual_dir))
+
+ # test mkdir dir not exist
+ parent_not_exist_virtual_path = makedirs_dir + "/not_exist/sub_dir"
+ self.assertFalse(fs.exists(parent_not_exist_virtual_path))
+ fs.makedirs(parent_not_exist_virtual_path)
+ self.assertTrue(fs.exists(parent_not_exist_virtual_path))
+
+ def test_created(self):
+ created_dir = self.fileset_gvfs_location + "/test_created"
+ created_actual_dir = self.fileset_storage_location + "/test_created"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(created_actual_dir)
+ self.assertTrue(self.hdfs.exists(created_actual_dir))
+ self.assertTrue(fs.exists(created_dir))
+
+ with self.assertRaises(GravitinoRuntimeException):
+ fs.created(created_dir)
+
+ def test_modified(self):
+ modified_dir = self.fileset_gvfs_location + "/test_modified"
+ modified_actual_dir = self.fileset_storage_location + "/test_modified"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(modified_actual_dir)
+ self.assertTrue(self.hdfs.exists(modified_actual_dir))
+ self.assertTrue(fs.exists(modified_dir))
+
+ # test mkdir dir which exists
+ self.assertIsNotNone(fs.modified(modified_dir))
+
+ def test_cat_file(self):
+ cat_dir = self.fileset_gvfs_location + "/test_cat"
+ cat_actual_dir = self.fileset_storage_location + "/test_cat"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(cat_actual_dir)
+ self.assertTrue(self.hdfs.exists(cat_actual_dir))
+ self.assertTrue(fs.exists(cat_dir))
+
+ cat_file = self.fileset_gvfs_location + "/test_cat/test.file"
+ cat_actual_file = self.fileset_storage_location + "/test_cat/test.file"
+ self.hdfs.touch(cat_actual_file)
+ self.assertTrue(self.hdfs.exists(cat_actual_file))
+ self.assertTrue(fs.exists(cat_file))
+
+ # test open and write file
+ with fs.open(cat_file, mode="wb") as f:
+ f.write(b"test_cat_file")
+ self.assertTrue(fs.info(cat_file)["size"] > 0)
+
+ # test cat file
+ content = fs.cat_file(cat_file)
+ self.assertEqual(b"test_cat_file", content)
+
+ def test_get_file(self):
+ get_dir = self.fileset_gvfs_location + "/test_get"
+ get_actual_dir = self.fileset_storage_location + "/test_get"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(get_actual_dir)
+ self.assertTrue(self.hdfs.exists(get_actual_dir))
+ self.assertTrue(fs.exists(get_dir))
+
+ get_file = self.fileset_gvfs_location + "/test_get/test.file"
+ get_actual_file = self.fileset_storage_location + "/test_get/test.file"
+ self.hdfs.touch(get_actual_file)
+ self.assertTrue(self.hdfs.exists(get_actual_file))
+ self.assertTrue(fs.exists(get_file))
+
+ # test open and write file
+ with fs.open(get_file, mode="wb") as f:
+ f.write(b"test_get_file")
+ self.assertTrue(fs.info(get_file)["size"] > 0)
+
+ # test get file
+ local_fs = LocalFileSystem()
+ local_dir = "/tmp/test_gvfs_local_file_" + str(randint(1, 10000))
+ local_fs.makedirs(local_dir)
+ local_path = local_dir + "/get_file.txt"
+ local_fs.touch(local_path)
+ self.assertTrue(local_fs.exists(local_path))
+ fs.get_file(get_file, local_path)
+ self.assertEqual(b"test_get_file", local_fs.cat_file(local_path))
+ local_fs.rm(local_dir, recursive=True)
+
+ # test get a file to a remote file
+ remote_path = self.fileset_gvfs_location + "/test_file_2.par"
+ with self.assertRaises(GravitinoRuntimeException):
+ fs.get_file(get_file, remote_path)
+
+ def test_pandas(self):
+ pands_dir = self.fileset_gvfs_location + "/test_pandas"
+ pands_actual_dir = self.fileset_storage_location + "/test_pandas"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(pands_actual_dir)
+ self.assertTrue(self.hdfs.exists(pands_actual_dir))
+ self.assertTrue(fs.exists(pands_dir))
+
+ data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21,
19, 18]})
+ # to parquet
+ parquet_file = self.fileset_gvfs_location + "/test_pandas/test.parquet"
+ parquet_actual_file = (
+ self.fileset_storage_location + "/test_pandas/test.parquet"
+ )
+ data.to_parquet(parquet_file, filesystem=fs)
+ self.assertTrue(fs.exists(parquet_file))
+ self.assertTrue(self.hdfs.exists(parquet_actual_file))
+
+ # read parquet
+ ds1 = pandas.read_parquet(path=parquet_file, filesystem=fs)
+ self.assertTrue(data.equals(ds1))
+ storage_options = {
+ "server_uri": "http://localhost:8090",
+ "metalake_name": self.metalake_name,
+ }
+ # to csv
+ csv_file = self.fileset_gvfs_location + "/test_pandas/test.csv"
+ csv_actual_file = self.fileset_storage_location +
"/test_pandas/test.csv"
+ data.to_csv(
+ csv_file,
+ index=False,
+ storage_options=storage_options,
+ )
+ self.assertTrue(fs.exists(csv_file))
+ self.assertTrue(self.hdfs.exists(csv_actual_file))
+
+ # read csv
+ ds2 = pandas.read_csv(csv_file, storage_options=storage_options)
+ self.assertTrue(data.equals(ds2))
+
+ def test_pyarrow(self):
+ pyarrow_dir = self.fileset_gvfs_location + "/test_pyarrow"
+ pyarrow_actual_dir = self.fileset_storage_location + "/test_pyarrow"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(pyarrow_actual_dir)
+ self.assertTrue(self.hdfs.exists(pyarrow_actual_dir))
+ self.assertTrue(fs.exists(pyarrow_dir))
+
+ data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21,
19, 18]})
+ # to parquet
+ parquet_file = pyarrow_dir + "/test.parquet"
+ data.to_parquet(parquet_file, filesystem=fs)
+ self.assertTrue(fs.exists(parquet_file))
+
+ # read as arrow dataset
+ arrow_dataset = dt.dataset(parquet_file, filesystem=fs)
+ arrow_tb_1 = arrow_dataset.to_table()
+
+ arrow_tb_2 = pa.Table.from_pandas(data)
+ self.assertTrue(arrow_tb_1.equals(arrow_tb_2))
+
+ # read as arrow parquet dataset
+ arrow_tb_3 = pq.read_table(parquet_file, filesystem=fs)
+ self.assertTrue(arrow_tb_3.equals(arrow_tb_2))
+
+ def test_llama_index(self):
+ llama_dir = self.fileset_gvfs_location + "/test_llama"
+ llama_actual_dir = self.fileset_storage_location + "/test_llama"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ **self.conf,
+ )
+ self.hdfs.mkdir(llama_actual_dir)
+ self.assertTrue(self.hdfs.exists(llama_actual_dir))
+ self.assertTrue(fs.exists(llama_dir))
+ data = pandas.DataFrame({"Name": ["A", "B", "C", "D"], "ID": [20, 21,
19, 18]})
+
+ storage_options = {
+ "server_uri": "http://localhost:8090",
+ "metalake_name": self.metalake_name,
+ }
+ csv_file = llama_dir + "/test.csv"
+ # to csv
+ data.to_csv(
+ csv_file,
+ index=False,
+ storage_options=storage_options,
+ )
+ self.assertTrue(fs.exists(csv_file))
+ another_csv_file = llama_dir + "/sub_dir/test1.csv"
+ data.to_csv(
+ another_csv_file,
+ index=False,
+ storage_options=storage_options,
+ )
+ self.assertTrue(fs.exists(another_csv_file))
+
+ reader = SimpleDirectoryReader(
+ input_dir=llama_dir[len("gvfs://") :],
+ fs=fs,
+ recursive=True, # recursively searches all subdirectories
+ )
+ documents = reader.load_data()
+ self.assertEqual(len(documents), 2)
+ doc_1 = documents[0]
+ result_1 = [line.strip().split(", ") for line in
doc_1.text.split("\n")]
+ self.assertEqual(4, len(result_1))
+ for row in result_1:
+ if row[0] == "A":
+ self.assertEqual(row[1], "20")
+ elif row[0] == "B":
+ self.assertEqual(row[1], "21")
+ elif row[0] == "C":
+ self.assertEqual(row[1], "19")
+ elif row[0] == "D":
+ self.assertEqual(row[1], "18")
diff --git a/clients/client-python/tests/integration/test_simple_auth_client.py
b/clients/client-python/tests/integration/test_simple_auth_client.py
index 516db2add..a4ed77fe1 100644
--- a/clients/client-python/tests/integration/test_simple_auth_client.py
+++ b/clients/client-python/tests/integration/test_simple_auth_client.py
@@ -19,7 +19,6 @@ under the License.
import logging
import os
-import unittest
from random import randint
from typing import Dict
@@ -31,11 +30,12 @@ from gravitino import (
Fileset,
)
from gravitino.auth.simple_auth_provider import SimpleAuthProvider
+from tests.integration.integration_test_env import IntegrationTestEnv
logger = logging.getLogger(__name__)
-class TestSimpleAuthClient(unittest.TestCase):
+class TestSimpleAuthClient(IntegrationTestEnv):
creator: str = "test_client"
metalake_name: str = "TestClient_metalake" + str(randint(1, 10000))
catalog_name: str = "fileset_catalog"