This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 04ca8ae Add list-refs cli command (#137)
04ca8ae is described below
commit 04ca8ae87d0854a0749941c4d58fc2b5c96deccf
Author: Amogh Jahagirdar <[email protected]>
AuthorDate: Fri Nov 10 12:21:49 2023 -0800
Add list-refs cli command (#137)
* Python: Add list-refs CLI command
* Update pyiceberg/cli/console.py
---------
Co-authored-by: Fokko Driesprong <[email protected]>
---
pyiceberg/cli/console.py | 52 +++++++++++++++++++++++++++++++++++++++++++++
pyiceberg/cli/output.py | 35 +++++++++++++++++++++++++++++-
pyiceberg/table/__init__.py | 5 +++++
3 files changed, 91 insertions(+), 1 deletion(-)
diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py
index 62f7a02..bd1164f 100644
--- a/pyiceberg/cli/console.py
+++ b/pyiceberg/cli/console.py
@@ -19,6 +19,7 @@ from functools import wraps
from typing import (
Any,
Callable,
+ Dict,
Literal,
Optional,
Tuple,
@@ -31,6 +32,10 @@ from pyiceberg import __version__
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.cli.output import ConsoleOutput, JsonOutput, Output
from pyiceberg.exceptions import NoSuchNamespaceError,
NoSuchPropertyException, NoSuchTableError
+from pyiceberg.table.refs import SnapshotRef
+
+DEFAULT_MIN_SNAPSHOTS_TO_KEEP = 1
+DEFAULT_MAX_SNAPSHOT_AGE_MS = 432000000
def catch_exception() -> Callable: # type: ignore
@@ -372,3 +377,50 @@ def table(ctx: Context, identifier: str, property_name:
str) -> None: # noqa: F
ctx.exit(1)
else:
raise NoSuchPropertyException(f"Property {property_name} does not
exist on {identifier}")
+
+
[email protected]()
[email protected]("identifier")
[email protected]("--type", required=False)
[email protected]("--verbose", type=click.BOOL)
[email protected]_context
+@catch_exception()
+def list_refs(ctx: Context, identifier: str, type: str, verbose: bool) -> None:
+ """List all the refs in the provided table."""
+ catalog, output = _catalog_and_output(ctx)
+ table = catalog.load_table(identifier)
+ refs = table.refs()
+ if type:
+ type = type.lower()
+ if type not in {"branch", "tag"}:
+ raise ValueError(f"Type must be either branch or tag, got: {type}")
+
+ relevant_refs = [
+ (ref_name, ref.snapshot_ref_type, _retention_properties(ref,
table.properties))
+ for (ref_name, ref) in refs.items()
+ if not type or ref.snapshot_ref_type == type
+ ]
+
+ output.describe_refs(relevant_refs)
+
+
+def _retention_properties(ref: SnapshotRef, table_properties: Dict[str, str])
-> Dict[str, str]:
+ retention_properties = {}
+ if ref.snapshot_ref_type == "branch":
+ default_min_snapshots_to_keep = table_properties.get(
+ "history.expire.min-snapshots-to-keep",
DEFAULT_MIN_SNAPSHOTS_TO_KEEP
+ )
+ retention_properties["min_snapshots_to_keep"] = (
+ str(ref.min_snapshots_to_keep) if ref.min_snapshots_to_keep else
str(default_min_snapshots_to_keep)
+ )
+ default_max_snapshot_age_ms =
table_properties.get("history.expire.max-snapshot-age-ms",
DEFAULT_MAX_SNAPSHOT_AGE_MS)
+ retention_properties["max_snapshot_age_ms"] = (
+ str(ref.max_snapshot_age_ms) if ref.max_snapshot_age_ms else
str(default_max_snapshot_age_ms)
+ )
+ else:
+ retention_properties["min_snapshots_to_keep"] = "N/A"
+ retention_properties["max_snapshot_age_ms"] = "N/A"
+
+ retention_properties["max_ref_age_ms"] = str(ref.max_ref_age_ms) if
ref.max_ref_age_ms else "forever"
+
+ return retention_properties
diff --git a/pyiceberg/cli/output.py b/pyiceberg/cli/output.py
index 299f84d..8e0ad2d 100644
--- a/pyiceberg/cli/output.py
+++ b/pyiceberg/cli/output.py
@@ -16,7 +16,13 @@
# under the License.
import json
from abc import ABC, abstractmethod
-from typing import Any, List, Optional
+from typing import (
+ Any,
+ Dict,
+ List,
+ Optional,
+ Tuple,
+)
from uuid import UUID
from rich.console import Console
@@ -26,6 +32,7 @@ from rich.tree import Tree
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table, TableMetadata
+from pyiceberg.table.refs import SnapshotRefType
from pyiceberg.typedef import IcebergBaseModel, Identifier, Properties
@@ -72,6 +79,10 @@ class Output(ABC):
def version(self, version: str) -> None:
...
+ @abstractmethod
+ def describe_refs(self, refs: List[Tuple[str, SnapshotRefType, Dict[str,
str]]]) -> None:
+ ...
+
class ConsoleOutput(Output):
"""Writes to the console."""
@@ -174,6 +185,19 @@ class ConsoleOutput(Output):
def version(self, version: str) -> None:
Console().print(version)
+ def describe_refs(self, ref_details: List[Tuple[str, SnapshotRefType,
Dict[str, str]]]) -> None:
+ refs_table = RichTable(title="Snapshot Refs")
+ refs_table.add_column("Ref")
+ refs_table.add_column("Type")
+ refs_table.add_column("Max ref age ms")
+ refs_table.add_column("Min snapshots to keep")
+ refs_table.add_column("Max snapshot age ms")
+ for name, type, ref_detail in ref_details:
+ refs_table.add_row(
+ name, type, ref_detail["max_ref_age_ms"],
ref_detail["min_snapshots_to_keep"], ref_detail["max_snapshot_age_ms"]
+ )
+ Console().print(refs_table)
+
class JsonOutput(Output):
"""Writes json to stdout."""
@@ -226,3 +250,12 @@ class JsonOutput(Output):
def version(self, version: str) -> None:
self._out({"version": version})
+
+ def describe_refs(self, refs: List[Tuple[str, SnapshotRefType, Dict[str,
str]]]) -> None:
+ self._out(
+ [
+ {"name": name, "type": type, detail_key: detail_val}
+ for name, type, detail in refs
+ for detail_key, detail_val in detail.items()
+ ]
+ )
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 21a3b7f..b0f2bcd 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -70,6 +70,7 @@ from pyiceberg.schema import (
visit,
)
from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata
+from pyiceberg.table.refs import SnapshotRef
from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry
from pyiceberg.table.sorting import SortOrder
from pyiceberg.typedef import (
@@ -569,6 +570,10 @@ class Table:
def update_schema(self, allow_incompatible_changes: bool = False,
case_sensitive: bool = True) -> UpdateSchema:
return UpdateSchema(self,
allow_incompatible_changes=allow_incompatible_changes,
case_sensitive=case_sensitive)
+ def refs(self) -> Dict[str, SnapshotRef]:
+ """Return the snapshot references in the table."""
+ return self.metadata.refs
+
def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements:
Tuple[TableRequirement, ...]) -> None:
response = self.catalog._commit_table( # pylint: disable=W0212
CommitTableRequest(