This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 80aee85383 Python: Alter table plumbing and REST support (#6323)
80aee85383 is described below
commit 80aee853830ed0a62ff2575196e1f436939c6e06
Author: Fokko Driesprong <[email protected]>
AuthorDate: Thu Jun 22 08:56:00 2023 +0200
Python: Alter table plumbing and REST support (#6323)
* Alter table
* Make the CI happy
* Comments
* Thanks Ryan!
* Python: Bump dependencies to the latest version
* Remove from docs
* WIP
* Comments
* Make CI happy
* Update docstrings
* Do some renaming
* Add a context manager
* Rename commit to commit_transaction()
* Update docs
* Refresh in place
* Remove redudant call
* Load a fresh copy instead
* Fix the docstrings
* Restore CommitTableResponse
* Conflicts
---
python/mkdocs/docs/api.md | 168 ++++--------------
python/poetry.lock | 57 +++----
python/pyiceberg/catalog/__init__.py | 23 ++-
python/pyiceberg/catalog/dynamodb.py | 17 +-
python/pyiceberg/catalog/glue.py | 17 +-
python/pyiceberg/catalog/hive.py | 17 +-
python/pyiceberg/catalog/noop.py | 79 +++++++++
python/pyiceberg/catalog/rest.py | 74 +++++---
python/pyiceberg/exceptions.py | 8 +
python/pyiceberg/table/__init__.py | 318 ++++++++++++++++++++++++++++++++++-
python/tests/catalog/test_base.py | 7 +-
python/tests/catalog/test_rest.py | 8 +-
python/tests/cli/test_console.py | 9 +-
python/tests/io/test_pyarrow.py | 5 +
python/tests/table/test_init.py | 2 +
python/tests/test_integration.py | 52 ++++++
16 files changed, 660 insertions(+), 201 deletions(-)
diff --git a/python/mkdocs/docs/api.md b/python/mkdocs/docs/api.md
index ddd5ca180f..d3b8fceee5 100644
--- a/python/mkdocs/docs/api.md
+++ b/python/mkdocs/docs/api.md
@@ -78,110 +78,15 @@ catalog.load_table(("nyc", "taxis"))
# The tuple syntax can be used if the namespace or table contains a dot.
```
-This returns a `Table` that represents an Iceberg table:
-
-```python
-Table(
- identifier=('nyc', 'taxis'),
-
metadata_location='s3a://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json',
- metadata=TableMetadataV2(
- location='s3a://warehouse/wh/nyc.db/taxis',
- table_uuid=UUID('ebd5d172-2162-453d-b586-1cdce52c1116'),
- last_updated_ms=1662633437826,
- last_column_id=19,
- schemas=[Schema(
- NestedField(field_id=1, name='VendorID', field_type=LongType(),
required=False),
- NestedField(field_id=2, name='tpep_pickup_datetime',
field_type=TimestamptzType(), required=False),
- NestedField(field_id=3, name='tpep_dropoff_datetime',
field_type=TimestamptzType(), required=False),
- NestedField(field_id=4, name='passenger_count',
field_type=DoubleType(), required=False),
- NestedField(field_id=5, name='trip_distance', field_type=DoubleType(),
required=False),
- NestedField(field_id=6, name='RatecodeID', field_type=DoubleType(),
required=False),
- NestedField(field_id=7, name='store_and_fwd_flag',
field_type=StringType(), required=False),
- NestedField(field_id=8, name='PULocationID', field_type=LongType(),
required=False),
- NestedField(field_id=9, name='DOLocationID', field_type=LongType(),
required=False),
- NestedField(field_id=10, name='payment_type', field_type=LongType(),
required=False),
- NestedField(field_id=11, name='fare_amount', field_type=DoubleType(),
required=False),
- NestedField(field_id=12, name='extra', field_type=DoubleType(),
required=False),
- NestedField(field_id=13, name='mta_tax', field_type=DoubleType(),
required=False),
- NestedField(field_id=14, name='tip_amount', field_type=DoubleType(),
required=False),
- NestedField(field_id=15, name='tolls_amount', field_type=DoubleType(),
required=False),
- NestedField(field_id=16, name='improvement_surcharge',
field_type=DoubleType(), required=False),
- NestedField(field_id=17, name='total_amount', field_type=DoubleType(),
required=False),
- NestedField(field_id=18, name='congestion_surcharge',
field_type=DoubleType(), required=False),
- NestedField(field_id=19, name='airport_fee', field_type=DoubleType(),
required=False)
- ),
- schema_id=0,
- identifier_field_ids=[]
- )],
- current_schema_id=0,
- partition_specs=[PartitionSpec(spec_id=0)],
- default_spec_id=0,
- last_partition_id=999,
- properties={
- 'owner': 'root',
- 'write.format.default': 'parquet'
- },
- current_snapshot_id=8334458494559715805,
- snapshots=[
- Snapshot(
- snapshot_id=7910949481055846233,
- parent_snapshot_id=None,
- sequence_number=None,
- timestamp_ms=1662489306555,
-
manifest_list='s3a://warehouse/wh/nyc.db/taxis/metadata/snap-7910949481055846233-1-3eb7a2e1-5b7a-4e76-a29a-3e29c176eea4.avro',
- summary=Summary(
- Operation.APPEND,
- **{
- 'spark.app.id': 'local-1662489289173',
- 'added-data-files': '1',
- 'added-records': '2979431',
- 'added-files-size': '46600777',
- 'changed-partition-count': '1',
- 'total-records': '2979431',
- 'total-files-size': '46600777',
- 'total-data-files': '1',
- 'total-delete-files': '0',
- 'total-position-deletes': '0',
- 'total-equality-deletes': '0'
- }
- ),
- schema_id=0
- ),
- ],
- snapshot_log=[
- SnapshotLogEntry(
- snapshot_id='7910949481055846233',
- timestamp_ms=1662489306555
- )
- ],
- metadata_log=[
- MetadataLogEntry(
-
metadata_file='s3a://warehouse/wh/nyc.db/taxis/metadata/00000-b58341ba-6a63-4eea-9b2f-e85e47c7d09f.metadata.json',
- timestamp_ms=1662489306555
- )
- ],
- sort_orders=[SortOrder(order_id=0)],
- default_sort_order_id=0,
- refs={
- 'main': SnapshotRef(
- snapshot_id=8334458494559715805,
- snapshot_ref_type=SnapshotRefType.BRANCH,
- min_snapshots_to_keep=None,
- max_snapshot_age_ms=None,
- max_ref_age_ms=None
- )
- },
- format_version=2,
- last_sequence_number=1
- )
-)
-```
+This returns a `Table` that represents an Iceberg table that can be queried
and altered.
### Directly from a metadata file
To load a table directly from a metadata file (i.e., **without** using a
catalog), you can use a `StaticTable` as follows:
```python
+from pyiceberg.table import StaticTable
+
table = StaticTable.from_metadata(
"s3a://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
)
@@ -241,52 +146,37 @@ catalog.create_table(
)
```
-Which returns a newly created table:
+### Update table properties
+
+Set and remove properties through the `Transaction` API:
```python
-Table(
- identifier=('default', 'bids'),
-
metadata_location='/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/bids//metadata/00000-c8cd93ab-f784-474d-a167-b1a86b05195f.metadata.json',
- metadata=TableMetadataV2(
-
location='/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/bids/',
- table_uuid=UUID('38d4cb39-4945-4bf2-b374-984b5c4984d2'),
- last_updated_ms=1661847562069,
- last_column_id=4,
- schemas=[
- Schema(
- NestedField(field_id=1, name='datetime',
field_type=TimestampType(), required=False),
- NestedField(field_id=2, name='bid', field_type=DoubleType(),
required=False),
- NestedField(field_id=3, name='ask', field_type=DoubleType(),
required=False),
- NestedField(field_id=4, name='symbol',
field_type=StringType(), required=False)),
- schema_id=1,
- identifier_field_ids=[])
- ],
- current_schema_id=1,
- partition_specs=[
- PartitionSpec(
- PartitionField(source_id=1, field_id=1000,
transform=DayTransform(), name='datetime_day'),))
- ],
- default_spec_id=0,
- last_partition_id=1000,
- properties={},
- current_snapshot_id=None,
- snapshots=[],
- snapshot_log=[],
- metadata_log=[],
- sort_orders=[
- SortOrder(order_id=1, fields=[SortField(source_id=4,
transform=IdentityTransform(), direction=SortDirection.ASC,
null_order=NullOrder.NULLS_FIRST)])
- ],
- default_sort_order_id=1,
- refs={},
- format_version=2,
- last_sequence_number=0
- )
-)
+with table.transaction() as transaction:
+ transaction.set_properties(abc="def")
+
+assert table.properties == {"abc": "def"}
+
+with table.transaction() as transaction:
+ transaction.remove_properties("abc")
+
+assert table.properties == {}
+```
+
+Or, without a context manager:
+
+```python
+table = table.transaction().set_properties(abc="def").commit_transaction()
+
+assert table.properties == {"abc": "def"}
+
+table = table.transaction().remove_properties("abc").commit_transaction()
+
+assert table.properties == {}
```
-## Query a table
+## Query the data
-To query a table, a table scan is needed. A table scan accepts a filter,
columns and optionally a limit and a snapshot ID:
+To query a table, a table scan is needed. A table scan accepts a filter,
columns, optionally a limit and a snapshot ID:
```python
from pyiceberg.catalog import load_catalog
diff --git a/python/poetry.lock b/python/poetry.lock
index 729f4fc5ea..74dbe1683b 100644
--- a/python/poetry.lock
+++ b/python/poetry.lock
@@ -1703,13 +1703,13 @@ files = [
[[package]]
name = "platformdirs"
-version = "3.6.0"
+version = "3.7.0"
description = "A small Python package for determining appropriate
platform-specific dirs, e.g. a \"user data dir\"."
optional = false
python-versions = ">=3.7"
files = [
- {file = "platformdirs-3.6.0-py3-none-any.whl", hash =
"sha256:ffa199e3fbab8365778c4a10e1fbf1b9cd50707de826eb304b50e57ec0cc8d38"},
- {file = "platformdirs-3.6.0.tar.gz", hash =
"sha256:57e28820ca8094678b807ff529196506d7a21e17156cb1cddb3e74cebce54640"},
+ {file = "platformdirs-3.7.0-py3-none-any.whl", hash =
"sha256:cfd065ba43133ff103ab3bd10aecb095c2a0035fcd1f07217c9376900d94ba07"},
+ {file = "platformdirs-3.7.0.tar.gz", hash =
"sha256:87fbf6473e87c078d536980ba970a472422e94f17b752cfad17024c18876d481"},
]
[package.extras]
@@ -1718,13 +1718,13 @@ test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)",
"pytest (>=7.3.1)", "pytest-
[[package]]
name = "pluggy"
-version = "1.0.0"
+version = "1.2.0"
description = "plugin and hook calling mechanisms for python"
optional = false
-python-versions = ">=3.6"
+python-versions = ">=3.7"
files = [
- {file = "pluggy-1.0.0-py2.py3-none-any.whl", hash =
"sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"},
- {file = "pluggy-1.0.0.tar.gz", hash =
"sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159"},
+ {file = "pluggy-1.2.0-py3-none-any.whl", hash =
"sha256:c2fd55a7d7a3863cba1a013e4e2414658b1d07b6bc57b3919e0c63c9abb99849"},
+ {file = "pluggy-1.2.0.tar.gz", hash =
"sha256:d12f0c4b579b15f5e054301bb226ee85eeeba08ffec228092f8defbaa3a4c4b3"},
]
[package.extras]
@@ -2186,32 +2186,27 @@ files = [
[[package]]
name = "ray"
-version = "2.5.0"
+version = "2.5.1"
description = "Ray provides a simple, universal API for building distributed
applications."
optional = true
python-versions = "*"
files = [
- {file = "ray-2.5.0-cp310-cp310-macosx_10_15_universal2.whl", hash =
"sha256:d1bebc874e896880c1215f4c1a11697ada49fa1595d6d99d7c5b4dc03030df36"},
- {file = "ray-2.5.0-cp310-cp310-macosx_11_0_arm64.whl", hash =
"sha256:c0285df2d24cacc36ca64b7852178a9bf37e3fc88545752fc2b46c27396965c1"},
- {file = "ray-2.5.0-cp310-cp310-manylinux2014_aarch64.whl", hash =
"sha256:38935d46c2597c1d1f113e1c8f88e2716c67052c480de5b2a0265e0a1a5ce88f"},
- {file = "ray-2.5.0-cp310-cp310-manylinux2014_x86_64.whl", hash =
"sha256:d53a07c9a9dbc134945a26980f557e9ff0f591bf8cabed1a6ebf921768d1c8bd"},
- {file = "ray-2.5.0-cp310-cp310-win_amd64.whl", hash =
"sha256:ef26ba24461dad98365b48ef01e27e70bc9737f4cf4734115804153d7d9195dc"},
- {file = "ray-2.5.0-cp311-cp311-manylinux2014_aarch64.whl", hash =
"sha256:d714175a5000ca91f82646a9b72521118bb6d2db5568e1b7ae9ceb64769716b6"},
- {file = "ray-2.5.0-cp311-cp311-manylinux2014_x86_64.whl", hash =
"sha256:0cde929e63497ed5f1c8626e5ccf7595ef6acaf1e7e270ad7c12f8e1c7695244"},
- {file = "ray-2.5.0-cp37-cp37m-macosx_10_15_x86_64.whl", hash =
"sha256:7e5512abf62c05c9ff90b1c89a4e0f2e45ee00e73f816eb8265e3ebd92fe4064"},
- {file = "ray-2.5.0-cp37-cp37m-manylinux2014_aarch64.whl", hash =
"sha256:3bf36beb213f89c0eb1ec5ac6ffddc8f53e616be745167f00ca017abd8672a2d"},
- {file = "ray-2.5.0-cp37-cp37m-manylinux2014_x86_64.whl", hash =
"sha256:59c2448b07f45d9a9d8e594bb5337bd35a5fea04e42cb4211a3346c2c0d066b0"},
- {file = "ray-2.5.0-cp37-cp37m-win_amd64.whl", hash =
"sha256:63008dd659d9ef25b0e20f0e1a285e8266e0af68b1178bca1b6ae43e49a68104"},
- {file = "ray-2.5.0-cp38-cp38-macosx_10_15_x86_64.whl", hash =
"sha256:e9464e93d6b72e0da69b9c5ab0501cc40f2db14801e22c6b97fa4e8039647892"},
- {file = "ray-2.5.0-cp38-cp38-macosx_11_0_arm64.whl", hash =
"sha256:7dc00fac119bfa1c2f8ac456d50a728346d6f2722fb7a21bf70841fc7476c285"},
- {file = "ray-2.5.0-cp38-cp38-manylinux2014_aarch64.whl", hash =
"sha256:d76051519bd4ae39fda4a87536978cafdebf2843c1c29a9f734c503d8ea676cd"},
- {file = "ray-2.5.0-cp38-cp38-manylinux2014_x86_64.whl", hash =
"sha256:9a8e06dc5d4201129c28b6768a971c474b82a23935b2e40461ffc7f1c2f4942a"},
- {file = "ray-2.5.0-cp38-cp38-win_amd64.whl", hash =
"sha256:849014b62ca50ff106b7a5d41430346e2762b1c4c803673af076209925b8f912"},
- {file = "ray-2.5.0-cp39-cp39-macosx_10_15_x86_64.whl", hash =
"sha256:a1b52c12a3349d8e37357df31438b6f1b12c7719ef41bdf5089fc7e78e8ab212"},
- {file = "ray-2.5.0-cp39-cp39-macosx_11_0_arm64.whl", hash =
"sha256:25f3d50c27c4c4756259d093d152381c6604bb96684a0cf43c55ddcc2eb73f79"},
- {file = "ray-2.5.0-cp39-cp39-manylinux2014_aarch64.whl", hash =
"sha256:1cb4f6ef9cfdb69d2ae582f357e977527944390e2f5cbbf51efd8252ed4c9a11"},
- {file = "ray-2.5.0-cp39-cp39-manylinux2014_x86_64.whl", hash =
"sha256:662cff303c086369a29283badcd7445b7f911874d8407b2c589b1ccbf6028d2e"},
- {file = "ray-2.5.0-cp39-cp39-win_amd64.whl", hash =
"sha256:a2cea10981dad7cfd187edf5e225a667eb114269afc5f2321b52113ef2d86123"},
+ {file = "ray-2.5.1-cp310-cp310-macosx_10_15_universal2.whl", hash =
"sha256:43c1f3c662692bca6a03a7eb118e310c563ac64901b838c74ec1fe34c28dfce7"},
+ {file = "ray-2.5.1-cp310-cp310-macosx_11_0_arm64.whl", hash =
"sha256:bd47d617fcfc969d8f22814ebe283f2e7d3de63b3d4d34b968d905b7055f85cd"},
+ {file = "ray-2.5.1-cp310-cp310-manylinux2014_x86_64.whl", hash =
"sha256:75f2f91c80cf2cbfee630175bd0fff6ba109bcc054e378229134dd2736083dd7"},
+ {file = "ray-2.5.1-cp310-cp310-win_amd64.whl", hash =
"sha256:03bc24f865da7ad94b142c5c12b3827a95505cc1968366c825990b183d54a797"},
+ {file = "ray-2.5.1-cp311-cp311-manylinux2014_x86_64.whl", hash =
"sha256:8f561bb6067e5c10af4eb0f8c9185dedda5a0214fae4b187675632220dab5b69"},
+ {file = "ray-2.5.1-cp37-cp37m-macosx_10_15_x86_64.whl", hash =
"sha256:2f3a3667b5f40ed984ddab9854f3a0e9d88e12b8c078dd1b7964a7276693570d"},
+ {file = "ray-2.5.1-cp37-cp37m-manylinux2014_x86_64.whl", hash =
"sha256:9c4ede34d265fe52da74c8966e45c5d2f4c27a2d5fb68d4828bc3b428050ca39"},
+ {file = "ray-2.5.1-cp37-cp37m-win_amd64.whl", hash =
"sha256:e507339b6bb0bb85433dbce4e073f125a6e1da1009658963de0eaeb7ef4b7a93"},
+ {file = "ray-2.5.1-cp38-cp38-macosx_10_15_x86_64.whl", hash =
"sha256:4019a5000b23909c004dba4cc6b7457ffc798ace2db5489d2a2d7c37cb053323"},
+ {file = "ray-2.5.1-cp38-cp38-macosx_11_0_arm64.whl", hash =
"sha256:0289d3b6ecc86a260e51d23f7b04875ace5f9716ffd9a5a9f0072120d0866c37"},
+ {file = "ray-2.5.1-cp38-cp38-manylinux2014_x86_64.whl", hash =
"sha256:e7e91a93fced3695405d61849dff4efb8cec74f669b22c0276164136992f4528"},
+ {file = "ray-2.5.1-cp38-cp38-win_amd64.whl", hash =
"sha256:0e92c41c9326378bf2cdab3ae401a6279cda496da58d7a60564999de5dedcd2f"},
+ {file = "ray-2.5.1-cp39-cp39-macosx_10_15_x86_64.whl", hash =
"sha256:c7ca8df922d2a090c4f4a6e017e43e1da98b30e3ed086fc01f25624874c6b64a"},
+ {file = "ray-2.5.1-cp39-cp39-macosx_11_0_arm64.whl", hash =
"sha256:3cbe4284ee952bd9379210f08a2917bc7e74efb6ec71bd4f156e282e031cdbec"},
+ {file = "ray-2.5.1-cp39-cp39-manylinux2014_x86_64.whl", hash =
"sha256:a9b8bdcfce1646df2cdc89456bf6e1ff57091c433789e1fc34a8caf3a874b8c8"},
+ {file = "ray-2.5.1-cp39-cp39-win_amd64.whl", hash =
"sha256:5d6689e95706ba7ba4d13f9e09aed1498cee5736e0b74c61331585a3c1b086bf"},
]
[package.dependencies]
@@ -2239,8 +2234,8 @@ requests = "*"
[package.extras]
air = ["aiohttp (>=3.7)", "aiohttp-cors", "aiorwlock", "colorful", "fastapi",
"fsspec", "gpustat (>=1.0.0)", "numpy (>=1.20)", "opencensus", "pandas",
"pandas (>=1.3)", "prometheus-client (>=0.7.1)", "py-spy (>=0.2.0)", "pyarrow
(>=6.0.1)", "pydantic", "requests", "smart-open", "starlette", "tensorboardX
(>=1.9)", "uvicorn", "virtualenv (>=20.0.24,<20.21.1)"]
-all = ["aiohttp (>=3.7)", "aiohttp-cors", "aiorwlock", "colorful", "dm-tree",
"fastapi", "fsspec", "gpustat (>=1.0.0)", "gymnasium (==0.26.3)", "kubernetes",
"lz4", "numpy (>=1.20)", "opencensus", "opentelemetry-api",
"opentelemetry-exporter-otlp", "opentelemetry-sdk", "pandas", "pandas (>=1.3)",
"prometheus-client (>=0.7.1)", "py-spy (>=0.2.0)", "pyarrow (>=6.0.1)",
"pydantic", "pyyaml", "ray-cpp (==2.5.0)", "requests", "rich", "scikit-image",
"scipy", "smart-open", "starlette", "tensor [...]
-cpp = ["ray-cpp (==2.5.0)"]
+all = ["aiohttp (>=3.7)", "aiohttp-cors", "aiorwlock", "colorful", "dm-tree",
"fastapi", "fsspec", "gpustat (>=1.0.0)", "gymnasium (==0.26.3)", "kubernetes",
"lz4", "numpy (>=1.20)", "opencensus", "opentelemetry-api",
"opentelemetry-exporter-otlp", "opentelemetry-sdk", "pandas", "pandas (>=1.3)",
"prometheus-client (>=0.7.1)", "py-spy (>=0.2.0)", "pyarrow (>=6.0.1)",
"pydantic", "pyyaml", "ray-cpp (==2.5.1)", "requests", "rich", "scikit-image",
"scipy", "smart-open", "starlette", "tensor [...]
+cpp = ["ray-cpp (==2.5.1)"]
data = ["fsspec", "numpy (>=1.20)", "pandas (>=1.3)", "pyarrow (>=6.0.1)"]
default = ["aiohttp (>=3.7)", "aiohttp-cors", "colorful", "gpustat (>=1.0.0)",
"opencensus", "prometheus-client (>=0.7.1)", "py-spy (>=0.2.0)", "pydantic",
"requests", "smart-open", "virtualenv (>=20.0.24,<20.21.1)"]
k8s = ["kubernetes", "urllib3"]
diff --git a/python/pyiceberg/catalog/__init__.py
b/python/pyiceberg/catalog/__init__.py
index 490d9c2060..f1be22b99e 100644
--- a/python/pyiceberg/catalog/__init__.py
+++ b/python/pyiceberg/catalog/__init__.py
@@ -40,7 +40,12 @@ from pyiceberg.manifest import ManifestFile
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import ToOutputFile
-from pyiceberg.table import Table, TableMetadata
+from pyiceberg.table import (
+ CommitTableRequest,
+ CommitTableResponse,
+ Table,
+ TableMetadata,
+)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import (
EMPTY_DICT,
@@ -322,6 +327,20 @@ class Catalog(ABC):
NoSuchTableError: If a table with the name does not exist.
"""
+ @abstractmethod
+ def _commit_table(self, table_request: CommitTableRequest) ->
CommitTableResponse:
+ """Updates one or more tables.
+
+ Args:
+ table_request (CommitTableRequest): The table requests to be
carried out.
+
+ Returns:
+ CommitTableResponse: The updated metadata.
+
+ Raises:
+ NoSuchTableError: If a table with the given identifier does not
exist.
+ """
+
@abstractmethod
def create_namespace(self, namespace: Union[str, Identifier], properties:
Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.
@@ -392,7 +411,7 @@ class Catalog(ABC):
@abstractmethod
def update_namespace_properties(
- self, namespace: Union[str, Identifier], removals: set[str] | None =
None, updates: Properties = EMPTY_DICT
+ self, namespace: Union[str, Identifier], removals: Optional[Set[str]]
= None, updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
"""Removes provided property keys and updates properties for a
namespace.
diff --git a/python/pyiceberg/catalog/dynamodb.py
b/python/pyiceberg/catalog/dynamodb.py
index 642f6fbabf..d11ffb5d39 100644
--- a/python/pyiceberg/catalog/dynamodb.py
+++ b/python/pyiceberg/catalog/dynamodb.py
@@ -52,7 +52,7 @@ from pyiceberg.io import load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import Table
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT
@@ -168,6 +168,20 @@ class DynamoDbCatalog(Catalog):
return self.load_table(identifier=identifier)
+ def _commit_table(self, table_request: CommitTableRequest) ->
CommitTableResponse:
+ """Updates the table.
+
+ Args:
+ table_request (CommitTableRequest): The table requests to be
carried out.
+
+ Returns:
+ CommitTableResponse: The updated metadata.
+
+ Raises:
+ NoSuchTableError: If a table with the given identifier does not
exist.
+ """
+ raise NotImplementedError
+
def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""
Loads the table's metadata and returns the table instance.
@@ -577,6 +591,7 @@ class DynamoDbCatalog(Catalog):
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
+ catalog=self,
)
diff --git a/python/pyiceberg/catalog/glue.py b/python/pyiceberg/catalog/glue.py
index 24c077615e..7e06f2e47f 100644
--- a/python/pyiceberg/catalog/glue.py
+++ b/python/pyiceberg/catalog/glue.py
@@ -51,7 +51,7 @@ from pyiceberg.io import load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import Table
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT
@@ -170,6 +170,7 @@ class GlueCatalog(Catalog):
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
+ catalog=self,
)
def _create_glue_table(self, database_name: str, table_name: str,
table_input: Dict[str, Any]) -> None:
@@ -224,6 +225,20 @@ class GlueCatalog(Catalog):
return self.load_table(identifier=identifier)
+ def _commit_table(self, table_request: CommitTableRequest) ->
CommitTableResponse:
+ """Updates the table.
+
+ Args:
+ table_request (CommitTableRequest): The table requests to be
carried out.
+
+ Returns:
+ CommitTableResponse: The updated metadata.
+
+ Raises:
+ NoSuchTableError: If a table with the given identifier does not
exist.
+ """
+ raise NotImplementedError
+
def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Loads the table's metadata and returns the table instance.
diff --git a/python/pyiceberg/catalog/hive.py b/python/pyiceberg/catalog/hive.py
index b8c91796a2..839fb2a3d5 100644
--- a/python/pyiceberg/catalog/hive.py
+++ b/python/pyiceberg/catalog/hive.py
@@ -66,7 +66,7 @@ from pyiceberg.io import FileIO, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import Table
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT
@@ -243,6 +243,7 @@ class HiveCatalog(Catalog):
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
+ catalog=self,
)
def create_table(
@@ -302,6 +303,20 @@ class HiveCatalog(Catalog):
return self._convert_hive_into_iceberg(hive_table, io)
+ def _commit_table(self, table_request: CommitTableRequest) ->
CommitTableResponse:
+ """Updates the table.
+
+ Args:
+ table_request (CommitTableRequest): The table requests to be
carried out.
+
+ Returns:
+ CommitTableResponse: The updated metadata.
+
+ Raises:
+ NoSuchTableError: If a table with the given identifier does not
exist.
+ """
+ raise NotImplementedError
+
def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Loads the table's metadata and returns the table instance.
diff --git a/python/pyiceberg/catalog/noop.py b/python/pyiceberg/catalog/noop.py
new file mode 100644
index 0000000000..bb93772aa7
--- /dev/null
+++ b/python/pyiceberg/catalog/noop.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import (
+ List,
+ Optional,
+ Set,
+ Union,
+)
+
+from pyiceberg.catalog import Catalog, PropertiesUpdateSummary
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.table import (
+ CommitTableRequest,
+ CommitTableResponse,
+ SortOrder,
+ Table,
+)
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER
+from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
+
+
+class NoopCatalog(Catalog):
+ def create_table(
+ self,
+ identifier: Union[str, Identifier],
+ schema: Schema,
+ location: Optional[str] = None,
+ partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+ sort_order: SortOrder = UNSORTED_SORT_ORDER,
+ properties: Properties = EMPTY_DICT,
+ ) -> Table:
+ raise NotImplementedError
+
+ def load_table(self, identifier: Union[str, Identifier]) -> Table:
+ raise NotImplementedError
+
+ def drop_table(self, identifier: Union[str, Identifier]) -> None:
+ raise NotImplementedError
+
+ def rename_table(self, from_identifier: Union[str, Identifier],
to_identifier: Union[str, Identifier]) -> Table:
+ raise NotImplementedError
+
+ def _commit_table(self, table_request: CommitTableRequest) ->
CommitTableResponse:
+ raise NotImplementedError
+
+ def create_namespace(self, namespace: Union[str, Identifier], properties:
Properties = EMPTY_DICT) -> None:
+ raise NotImplementedError
+
+ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
+ raise NotImplementedError
+
+ def list_tables(self, namespace: Union[str, Identifier]) ->
List[Identifier]:
+ raise NotImplementedError
+
+ def list_namespaces(self, namespace: Union[str, Identifier] = ()) ->
List[Identifier]:
+ raise NotImplementedError
+
+ def load_namespace_properties(self, namespace: Union[str, Identifier]) ->
Properties:
+ raise NotImplementedError
+
+ def update_namespace_properties(
+ self, namespace: Union[str, Identifier], removals: Optional[Set[str]]
= None, updates: Properties = EMPTY_DICT
+ ) -> PropertiesUpdateSummary:
+ raise NotImplementedError
diff --git a/python/pyiceberg/catalog/rest.py b/python/pyiceberg/catalog/rest.py
index 4c36390171..56b8d9316e 100644
--- a/python/pyiceberg/catalog/rest.py
+++ b/python/pyiceberg/catalog/rest.py
@@ -22,6 +22,7 @@ from typing import (
Literal,
Optional,
Set,
+ Tuple,
Type,
Union,
)
@@ -42,6 +43,8 @@ from pyiceberg.catalog import (
from pyiceberg.exceptions import (
AuthorizationExpiredError,
BadRequestError,
+ CommitFailedException,
+ CommitStateUnknownException,
ForbiddenError,
NamespaceAlreadyExistsError,
NoSuchNamespaceError,
@@ -55,7 +58,12 @@ from pyiceberg.exceptions import (
)
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
-from pyiceberg.table import Table, TableMetadata
+from pyiceberg.table import (
+ CommitTableRequest,
+ CommitTableResponse,
+ Table,
+ TableMetadata,
+)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel
@@ -68,7 +76,7 @@ class Endpoints:
create_namespace: str = "namespaces"
load_namespace_metadata: str = "namespaces/{namespace}"
drop_namespace: str = "namespaces/{namespace}"
- update_properties: str = "namespaces/{namespace}/properties"
+ update_namespace_properties: str = "namespaces/{namespace}/properties"
list_tables: str = "namespaces/{namespace}/tables"
create_table: str = "namespaces/{namespace}/tables"
load_table: str = "namespaces/{namespace}/tables/{table}"
@@ -393,6 +401,17 @@ class RestCatalog(Catalog):
session.mount(self.uri, SigV4Adapter(**self.properties))
+ def _response_to_table(self, identifier_tuple: Tuple[str, ...],
table_response: TableResponse) -> Table:
+ return Table(
+ identifier=(self.name,) + identifier_tuple if self.name else
identifier_tuple,
+ metadata_location=table_response.metadata_location,
+ metadata=table_response.metadata,
+ io=self._load_file_io(
+ {**table_response.metadata.properties,
**table_response.config}, table_response.metadata_location
+ ),
+ catalog=self,
+ )
+
def create_table(
self,
identifier: Union[str, Identifier],
@@ -422,15 +441,7 @@ class RestCatalog(Catalog):
self._handle_non_200_response(exc, {409: TableAlreadyExistsError})
table_response = TableResponse(**response.json())
-
- return Table(
- identifier=(self.name,) + self.identifier_to_tuple(identifier),
- metadata_location=table_response.metadata_location,
- metadata=table_response.metadata,
- io=self._load_file_io(
- {**table_response.metadata.properties,
**table_response.config}, table_response.metadata_location
- ),
- )
+ return self._response_to_table(self.identifier_to_tuple(identifier),
table_response)
def list_tables(self, namespace: Union[str, Identifier]) ->
List[Identifier]:
namespace_tuple = self._check_valid_namespace_identifier(namespace)
@@ -455,14 +466,7 @@ class RestCatalog(Catalog):
self._handle_non_200_response(exc, {404: NoSuchTableError})
table_response = TableResponse(**response.json())
- return Table(
- identifier=(self.name,) + identifier_tuple if self.name else
identifier_tuple,
- metadata_location=table_response.metadata_location,
- metadata=table_response.metadata,
- io=self._load_file_io(
- {**table_response.metadata.properties,
**table_response.config}, table_response.metadata_location
- ),
- )
+ return self._response_to_table(identifier_tuple, table_response)
def drop_table(self, identifier: Union[str, Identifier], purge_requested:
bool = False) -> None:
response = self._session.delete(
@@ -489,6 +493,36 @@ class RestCatalog(Catalog):
return self.load_table(to_identifier)
+ def _commit_table(self, table_request: CommitTableRequest) ->
CommitTableResponse:
+ """Updates the table.
+
+ Args:
+ table_request (CommitTableRequest): The table requests to be
carried out.
+
+ Returns:
+ CommitTableResponse: The updated metadata.
+
+ Raises:
+ NoSuchTableError: If a table with the given identifier does not
exist.
+ """
+ response = self._session.post(
+ self.url(Endpoints.update_table, prefixed=True,
**self._split_identifier_for_path(table_request.identifier)),
+ data=table_request.json(),
+ )
+ try:
+ response.raise_for_status()
+ except HTTPError as exc:
+ self._handle_non_200_response(
+ exc,
+ {
+ 409: CommitFailedException,
+ 500: CommitStateUnknownException,
+ 502: CommitStateUnknownException,
+ 504: CommitStateUnknownException,
+ },
+ )
+ return CommitTableResponse(**response.json())
+
def create_namespace(self, namespace: Union[str, Identifier], properties:
Properties = EMPTY_DICT) -> None:
namespace_tuple = self._check_valid_namespace_identifier(namespace)
payload = {"namespace": namespace_tuple, "properties": properties}
@@ -541,7 +575,7 @@ class RestCatalog(Catalog):
namespace_tuple = self._check_valid_namespace_identifier(namespace)
namespace = NAMESPACE_SEPARATOR.join(namespace_tuple)
payload = {"removals": list(removals or []), "updates": updates}
- response = self._session.post(self.url(Endpoints.update_properties,
namespace=namespace), json=payload)
+ response =
self._session.post(self.url(Endpoints.update_namespace_properties,
namespace=namespace), json=payload)
try:
response.raise_for_status()
except HTTPError as exc:
diff --git a/python/pyiceberg/exceptions.py b/python/pyiceberg/exceptions.py
index 65b20caa88..f555543723 100644
--- a/python/pyiceberg/exceptions.py
+++ b/python/pyiceberg/exceptions.py
@@ -102,3 +102,11 @@ class ConditionalCheckFailedException(DynamoDbError):
class GenericDynamoDbError(DynamoDbError):
pass
+
+
+class CommitFailedException(RESTError):
+ """Commit failed, refresh and try again."""
+
+
+class CommitStateUnknownException(RESTError):
+ """Commit failed due to unknown reason."""
diff --git a/python/pyiceberg/table/__init__.py
b/python/pyiceberg/table/__init__.py
index 9c095a5319..0d433a65c6 100644
--- a/python/pyiceberg/table/__init__.py
+++ b/python/pyiceberg/table/__init__.py
@@ -18,6 +18,7 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
+from enum import Enum
from functools import cached_property
from itertools import chain
from multiprocessing.pool import ThreadPool
@@ -28,6 +29,7 @@ from typing import (
Dict,
Iterable,
List,
+ Literal,
Optional,
Set,
Tuple,
@@ -63,6 +65,7 @@ from pyiceberg.table.snapshots import Snapshot,
SnapshotLogEntry
from pyiceberg.table.sorting import SortOrder
from pyiceberg.typedef import (
EMPTY_DICT,
+ IcebergBaseModel,
Identifier,
KeyDefaultDict,
Properties,
@@ -74,24 +77,327 @@ if TYPE_CHECKING:
import ray
from duckdb import DuckDBPyConnection
+ from pyiceberg.catalog import Catalog
+
+
ALWAYS_TRUE = AlwaysTrue()
+class Transaction:
+ _table: Table
+ _updates: Tuple[TableUpdate, ...]
+ _requirements: Tuple[TableRequirement, ...]
+
+ def __init__(
+ self,
+ table: Table,
+ actions: Optional[Tuple[TableUpdate, ...]] = None,
+ requirements: Optional[Tuple[TableRequirement, ...]] = None,
+ ):
+ self._table = table
+ self._updates = actions or ()
+ self._requirements = requirements or ()
+
+ def __enter__(self) -> Transaction:
+ return self
+
+ def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
+ fresh_table = self.commit_transaction()
+ # Update the new data in place
+ self._table.metadata = fresh_table.metadata
+ self._table.metadata_location = fresh_table.metadata_location
+
+ def _append_updates(self, *new_updates: TableUpdate) -> Transaction:
+ """Appends updates to the set of staged updates.
+
+ Args:
+ *new_updates: Any new updates.
+
+ Raises:
+ ValueError: When the type of update is not unique.
+
+ Returns:
+ A new AlterTable object with the new updates appended.
+ """
+ for new_update in new_updates:
+ type_new_update = type(new_update)
+ if any(type(update) == type_new_update for update in
self._updates):
+ raise ValueError(f"Updates in a single commit need to be
unique, duplicate: {type_new_update}")
+ self._updates = self._updates + new_updates
+ return self
+
+ def set_table_version(self, format_version: Literal[1, 2]) -> Transaction:
+ """Sets the table to a certain version.
+
+ Args:
+ format_version: The newly set version.
+
+ Returns:
+ The alter table builder.
+ """
+ raise NotImplementedError("Not yet implemented")
+
+ def set_properties(self, **updates: str) -> Transaction:
+ """Set properties.
+
+ When a property is already set, it will be overwritten.
+
+ Args:
+ updates: The properties set on the table.
+
+ Returns:
+ The alter table builder.
+ """
+ return self._append_updates(SetPropertiesUpdate(updates=updates))
+
+ def remove_properties(self, *removals: str) -> Transaction:
+ """Removes properties.
+
+ Args:
+ removals: Properties to be removed.
+
+ Returns:
+ The alter table builder.
+ """
+ return self._append_updates(RemovePropertiesUpdate(removals=removals))
+
+ def update_location(self, location: str) -> Transaction:
+ """Sets the new table location.
+
+ Args:
+ location: The new location of the table.
+
+ Returns:
+ The alter table builder.
+ """
+ raise NotImplementedError("Not yet implemented")
+
+ def commit_transaction(self) -> Table:
+ """Commits the changes to the catalog.
+
+ Returns:
+ The table with the updates applied.
+ """
+ # Strip the catalog name
+ if len(self._updates) > 0:
+ response = self._table.catalog._commit_table( # pylint:
disable=W0212
+ CommitTableRequest(
+ identifier=self._table.identifier[1:],
+ requirements=self._requirements,
+ updates=self._updates,
+ )
+ )
+ # Update the metadata with the new one
+ self._table.metadata = response.metadata
+ self._table.metadata_location = response.metadata_location
+
+ return self._table
+ else:
+ return self._table
+
+
+class TableUpdateAction(Enum):
+ upgrade_format_version = "upgrade-format-version"
+ add_schema = "add-schema"
+ set_current_schema = "set-current-schema"
+ add_spec = "add-spec"
+ set_default_spec = "set-default-spec"
+ add_sort_order = "add-sort-order"
+ set_default_sort_order = "set-default-sort-order"
+ add_snapshot = "add-snapshot"
+ set_snapshot_ref = "set-snapshot-ref"
+ remove_snapshots = "remove-snapshots"
+ remove_snapshot_ref = "remove-snapshot-ref"
+ set_location = "set-location"
+ set_properties = "set-properties"
+ remove_properties = "remove-properties"
+
+
+class TableUpdate(IcebergBaseModel):
+ action: TableUpdateAction
+
+
+class UpgradeFormatVersionUpdate(TableUpdate):
+ action = TableUpdateAction.upgrade_format_version
+ format_version: int = Field(alias="format-version")
+
+
+class AddSchemaUpdate(TableUpdate):
+ action = TableUpdateAction.add_schema
+ schema_: Schema = Field(alias="schema")
+
+
+class SetCurrentSchemaUpdate(TableUpdate):
+ action = TableUpdateAction.set_current_schema
+ schema_id: int = Field(
+ alias="schema-id", description="Schema ID to set as current, or -1 to
set last added schema", default=-1
+ )
+
+
+class AddPartitionSpecUpdate(TableUpdate):
+ action = TableUpdateAction.add_spec
+ spec: PartitionSpec
+
+
+class SetDefaultSpecUpdate(TableUpdate):
+ action = TableUpdateAction.set_default_spec
+ spec_id: int = Field(
+ alias="spec-id", description="Partition spec ID to set as the default,
or -1 to set last added spec", default=-1
+ )
+
+
+class AddSortOrderUpdate(TableUpdate):
+ action = TableUpdateAction.add_sort_order
+ sort_order: SortOrder = Field(alias="sort-order")
+
+
+class SetDefaultSortOrderUpdate(TableUpdate):
+ action = TableUpdateAction.set_default_sort_order
+ sort_order_id: int = Field(
+ alias="sort-order-id", description="Sort order ID to set as the
default, or -1 to set last added sort order", default=-1
+ )
+
+
+class AddSnapshotUpdate(TableUpdate):
+ action = TableUpdateAction.add_snapshot
+ snapshot: Snapshot
+
+
+class SetSnapshotRefUpdate(TableUpdate):
+ action = TableUpdateAction.set_snapshot_ref
+ ref_name: str = Field(alias="ref-name")
+ type: Literal["tag", "branch"]
+ snapshot_id: int = Field(alias="snapshot-id")
+ max_age_ref_ms: int = Field(alias="max-ref-age-ms")
+ max_snapshot_age_ms: int = Field(alias="max-snapshot-age-ms")
+ min_snapshots_to_keep: int = Field(alias="min-snapshots-to-keep")
+
+
+class RemoveSnapshotsUpdate(TableUpdate):
+ action = TableUpdateAction.remove_snapshots
+ snapshot_ids: List[int] = Field(alias="snapshot-ids")
+
+
+class RemoveSnapshotRefUpdate(TableUpdate):
+ action = TableUpdateAction.remove_snapshot_ref
+ ref_name: str = Field(alias="ref-name")
+
+
+class SetLocationUpdate(TableUpdate):
+ action = TableUpdateAction.set_location
+ location: str
+
+
+class SetPropertiesUpdate(TableUpdate):
+ action = TableUpdateAction.set_properties
+ updates: Dict[str, str]
+
+
+class RemovePropertiesUpdate(TableUpdate):
+ action = TableUpdateAction.remove_properties
+ removals: List[str]
+
+
+class TableRequirement(IcebergBaseModel):
+ type: str
+
+
+class AssertCreate(TableRequirement):
+ """The table must not already exist; used for create transactions."""
+
+ type: Literal["assert-create"]
+
+
+class AssertTableUUID(TableRequirement):
+ """The table UUID must match the requirement's `uuid`."""
+
+ type: Literal["assert-table-uuid"]
+ uuid: str
+
+
+class AssertRefSnapshotId(TableRequirement):
+ """The table branch or tag identified by the requirement's `ref` must
reference the requirement's `snapshot-id`.
+
+ if `snapshot-id` is `null` or missing, the ref must not already exist.
+ """
+
+ type: Literal["assert-ref-snapshot-id"]
+ ref: str
+ snapshot_id: int = Field(..., alias="snapshot-id")
+
+
+class AssertLastAssignedFieldId(TableRequirement):
+ """The table's last assigned column id must match the requirement's
`last-assigned-field-id`."""
+
+ type: Literal["assert-last-assigned-field-id"]
+ last_assigned_field_id: int = Field(..., alias="last-assigned-field-id")
+
+
+class AssertCurrentSchemaId(TableRequirement):
+ """The table's current schema id must match the requirement's
`current-schema-id`."""
+
+ type: Literal["assert-current-schema-id"]
+ current_schema_id: int = Field(..., alias="current-schema-id")
+
+
+class AssertLastAssignedPartitionId(TableRequirement):
+ """The table's last assigned partition id must match the requirement's
`last-assigned-partition-id`."""
+
+ type: Literal["assert-last-assigned-partition-id"]
+ last_assigned_partition_id: int = Field(...,
alias="last-assigned-partition-id")
+
+
+class AssertDefaultSpecId(TableRequirement):
+ """The table's default spec id must match the requirement's
`default-spec-id`."""
+
+ type: Literal["assert-default-spec-id"]
+ default_spec_id: int = Field(..., alias="default-spec-id")
+
+
+class AssertDefaultSortOrderId(TableRequirement):
+ """The table's default sort order id must match the requirement's
`default-sort-order-id`."""
+
+ type: Literal["assert-default-sort-order-id"]
+ default_sort_order_id: int = Field(..., alias="default-sort-order-id")
+
+
+class CommitTableRequest(IcebergBaseModel):
+ identifier: Identifier = Field()
+ requirements: List[TableRequirement] = Field(default_factory=list)
+ updates: List[TableUpdate] = Field(default_factory=list)
+
+
+class CommitTableResponse(IcebergBaseModel):
+ metadata: TableMetadata = Field()
+ metadata_location: str = Field(alias="metadata-location")
+
+
class Table:
identifier: Identifier = Field()
metadata: TableMetadata = Field()
metadata_location: str = Field()
io: FileIO
+ catalog: Catalog
- def __init__(self, identifier: Identifier, metadata: TableMetadata,
metadata_location: str, io: FileIO) -> None:
+ def __init__(
+ self, identifier: Identifier, metadata: TableMetadata,
metadata_location: str, io: FileIO, catalog: Catalog
+ ) -> None:
self.identifier = identifier
self.metadata = metadata
self.metadata_location = metadata_location
self.io = io
+ self.catalog = catalog
+
+ def transaction(self) -> Transaction:
+ return Transaction(self)
def refresh(self) -> Table:
"""Refresh the current table metadata."""
- raise NotImplementedError("To be implemented")
+ fresh = self.catalog.load_table(self.identifier[1:])
+ self.metadata = fresh.metadata
+ self.io = fresh.io
+ self.metadata_location = fresh.metadata_location
+ return self
def name(self) -> Identifier:
"""Return the identifier of this table."""
@@ -142,6 +448,11 @@ class Table:
"""Return a dict of the sort orders of this table."""
return {sort_order.order_id: sort_order for sort_order in
self.metadata.sort_orders}
+ @property
+ def properties(self) -> Dict[str, str]:
+ """Properties of the table."""
+ return self.metadata.properties
+
def location(self) -> str:
"""Return the table's base location."""
return self.metadata.location
@@ -195,11 +506,14 @@ class StaticTable(Table):
metadata = FromInputFile.table_metadata(file)
+ from pyiceberg.catalog.noop import NoopCatalog
+
return cls(
identifier=("static-table", metadata_location),
metadata_location=metadata_location,
metadata=metadata,
io=load_file_io({**properties, **metadata.properties}),
+ catalog=NoopCatalog("static-table"),
)
diff --git a/python/tests/catalog/test_base.py
b/python/tests/catalog/test_base.py
index 742549a654..29b63e0900 100644
--- a/python/tests/catalog/test_base.py
+++ b/python/tests/catalog/test_base.py
@@ -42,7 +42,7 @@ from pyiceberg.exceptions import (
from pyiceberg.io import load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC,
PartitionField, PartitionSpec
from pyiceberg.schema import Schema
-from pyiceberg.table import Table
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table.metadata import TableMetadataV1
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.transforms import IdentityTransform
@@ -103,10 +103,14 @@ class InMemoryCatalog(Catalog):
),
metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json',
io=load_file_io(),
+ catalog=self,
)
self.__tables[identifier] = table
return table
+ def _commit_table(self, table_request: CommitTableRequest) ->
CommitTableResponse:
+ raise NotImplementedError
+
def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier = Catalog.identifier_to_tuple(identifier)
try:
@@ -141,6 +145,7 @@ class InMemoryCatalog(Catalog):
metadata=table.metadata,
metadata_location=table.metadata_location,
io=load_file_io(),
+ catalog=self,
)
return self.__tables[to_identifier]
diff --git a/python/tests/catalog/test_rest.py
b/python/tests/catalog/test_rest.py
index 6f3cafffc1..a7663ac511 100644
--- a/python/tests/catalog/test_rest.py
+++ b/python/tests/catalog/test_rest.py
@@ -408,7 +408,8 @@ def test_load_table_200(rest_mock: Mocker) -> None:
status_code=200,
request_headers=TEST_HEADERS,
)
- actual = RestCatalog("rest", uri=TEST_URI,
token=TEST_TOKEN).load_table(("fokko", "table"))
+ catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
+ actual = catalog.load_table(("fokko", "table"))
expected = Table(
identifier=("rest", "fokko", "table"),
metadata_location="s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
@@ -484,6 +485,7 @@ def test_load_table_200(rest_mock: Mocker) -> None:
partition_spec=[],
),
io=load_file_io(),
+ catalog=catalog,
)
assert actual == expected
@@ -585,7 +587,8 @@ def test_create_table_200(rest_mock: Mocker,
table_schema_simple: Schema) -> Non
status_code=200,
request_headers=TEST_HEADERS,
)
- table = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).create_table(
+ catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
+ table = catalog.create_table(
identifier=("fokko", "fokko2"),
schema=table_schema_simple,
location=None,
@@ -639,6 +642,7 @@ def test_create_table_200(rest_mock: Mocker,
table_schema_simple: Schema) -> Non
partition_spec=[],
),
io=load_file_io(),
+ catalog=catalog,
)
diff --git a/python/tests/cli/test_console.py b/python/tests/cli/test_console.py
index 555071dd0d..4aaec26a78 100644
--- a/python/tests/cli/test_console.py
+++ b/python/tests/cli/test_console.py
@@ -26,12 +26,13 @@ from unittest import mock
from click.testing import CliRunner
from pyiceberg.catalog import Catalog, PropertiesUpdateSummary
+from pyiceberg.catalog.noop import NoopCatalog
from pyiceberg.cli.console import run
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError
from pyiceberg.io import load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
-from pyiceberg.table import Table
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table.metadata import TableMetadataV2
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -115,8 +116,12 @@ class MockCatalog(Catalog):
metadata_location="s3://tmp/",
metadata=TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2),
io=load_file_io(),
+ catalog=self,
)
+ def _commit_table(self, table_request: CommitTableRequest) ->
CommitTableResponse:
+ raise NotImplementedError
+
def load_table(self, identifier: Union[str, Identifier]) -> Table:
tuple_identifier = Catalog.identifier_to_tuple(identifier)
if tuple_identifier == ("default", "foo"):
@@ -125,6 +130,7 @@ class MockCatalog(Catalog):
metadata_location="s3://tmp/",
metadata=TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2),
io=load_file_io(),
+ catalog=NoopCatalog("NoopCatalog"),
)
else:
raise NoSuchTableError(f"Table does not exist:
{'.'.join(tuple_identifier)}")
@@ -147,6 +153,7 @@ class MockCatalog(Catalog):
metadata_location="s3://tmp/",
metadata=TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2),
io=load_file_io(),
+ catalog=NoopCatalog("NoopCatalog"),
)
else:
raise NoSuchTableError(f"Table does not exist: {from_identifier}")
diff --git a/python/tests/io/test_pyarrow.py b/python/tests/io/test_pyarrow.py
index 3792aea03e..7a07532c66 100644
--- a/python/tests/io/test_pyarrow.py
+++ b/python/tests/io/test_pyarrow.py
@@ -27,6 +27,7 @@ import pytest
from pyarrow.fs import FileType, LocalFileSystem
from pyiceberg.avro.resolver import ResolveError
+from pyiceberg.catalog.noop import NoopCatalog
from pyiceberg.expressions import (
AlwaysFalse,
AlwaysTrue,
@@ -821,6 +822,7 @@ def project(
),
metadata_location="file://a/b/c.json",
io=PyArrowFileIO(),
+ catalog=NoopCatalog("NoopCatalog"),
),
expr or AlwaysTrue(),
schema,
@@ -1232,6 +1234,7 @@ def test_delete(deletes_file: str, example_task:
FileScanTask, table_schema_simp
),
metadata_location=metadata_location,
io=load_file_io(),
+ catalog=NoopCatalog("noop"),
),
row_filter=AlwaysTrue(),
projected_schema=table_schema_simple,
@@ -1274,6 +1277,7 @@ def test_delete_duplicates(deletes_file: str,
example_task: FileScanTask, table_
),
metadata_location=metadata_location,
io=load_file_io(),
+ catalog=NoopCatalog("noop"),
),
row_filter=AlwaysTrue(),
projected_schema=table_schema_simple,
@@ -1308,6 +1312,7 @@ def test_pyarrow_wrap_fsspec(example_task: FileScanTask,
table_schema_simple: Sc
),
metadata_location=metadata_location,
io=load_file_io(properties={"py-io-impl":
"pyiceberg.io.fsspec.FsspecFileIO"}, location=metadata_location),
+ catalog=NoopCatalog("NoopCatalog"),
),
case_sensitive=True,
projected_schema=table_schema_simple,
diff --git a/python/tests/table/test_init.py b/python/tests/table/test_init.py
index 36a518076b..b421f81493 100644
--- a/python/tests/table/test_init.py
+++ b/python/tests/table/test_init.py
@@ -20,6 +20,7 @@ from typing import Any, Dict
import pytest
from sortedcontainers import SortedList
+from pyiceberg.catalog.noop import NoopCatalog
from pyiceberg.expressions import (
AlwaysTrue,
And,
@@ -62,6 +63,7 @@ def table(example_table_metadata_v2: Dict[str, Any]) -> Table:
metadata=table_metadata,
metadata_location=f"{table_metadata.location}/uuid.metadata.json",
io=load_file_io(),
+ catalog=NoopCatalog("NoopCatalog"),
)
diff --git a/python/tests/test_integration.py b/python/tests/test_integration.py
index 14b3a36bda..3609920819 100644
--- a/python/tests/test_integration.py
+++ b/python/tests/test_integration.py
@@ -24,6 +24,7 @@ import pytest
from pyarrow.fs import S3FileSystem
from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import (
And,
GreaterThanOrEqual,
@@ -34,6 +35,13 @@ from pyiceberg.expressions import (
from pyiceberg.io.pyarrow import pyarrow_to_schema
from pyiceberg.schema import Schema
from pyiceberg.table import Table
+from pyiceberg.types import (
+ BooleanType,
+ IntegerType,
+ NestedField,
+ StringType,
+ TimestampType,
+)
@pytest.fixture()
@@ -70,6 +78,50 @@ def table_test_all_types(catalog: Catalog) -> Table:
return catalog.load_table("default.test_all_types")
+TABLE_NAME = ("default", "t1")
+
+
[email protected]()
+def table(catalog: Catalog) -> Table:
+ try:
+ catalog.drop_table(TABLE_NAME)
+ except NoSuchTableError:
+ pass # Just to make sure that the table doesn't exist
+
+ schema = Schema(
+ NestedField(field_id=1, name="str", field_type=StringType(),
required=False),
+ NestedField(field_id=2, name="int", field_type=IntegerType(),
required=True),
+ NestedField(field_id=3, name="bool", field_type=BooleanType(),
required=False),
+ NestedField(field_id=4, name="datetime", field_type=TimestampType(),
required=False),
+ schema_id=1,
+ )
+
+ return catalog.create_table(identifier=TABLE_NAME, schema=schema)
+
+
[email protected]
+def test_table_properties(table: Table) -> None:
+ assert table.properties == {}
+
+ with table.transaction() as transaction:
+ transaction.set_properties(abc="🤪")
+
+ assert table.properties == {"abc": "🤪"}
+
+ with table.transaction() as transaction:
+ transaction.remove_properties("abc")
+
+ assert table.properties == {}
+
+ table = table.transaction().set_properties(abc="def").commit_transaction()
+
+ assert table.properties == {"abc": "def"}
+
+ table = table.transaction().remove_properties("abc").commit_transaction()
+
+ assert table.properties == {}
+
+
@pytest.fixture()
def test_positional_mor_deletes(catalog: Catalog) -> Table:
"""Table that has positional deletes"""