This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 80aee85383 Python: Alter table plumbing and REST support (#6323)
80aee85383 is described below

commit 80aee853830ed0a62ff2575196e1f436939c6e06
Author: Fokko Driesprong <[email protected]>
AuthorDate: Thu Jun 22 08:56:00 2023 +0200

    Python: Alter table plumbing and REST support (#6323)
    
    * Alter table
    
    * Make the CI happy
    
    * Comments
    
    * Thanks Ryan!
    
    * Python: Bump dependencies to the latest version
    
    * Remove from docs
    
    * WIP
    
    * Comments
    
    * Make CI happy
    
    * Update docstrings
    
    * Do some renaming
    
    * Add a context manager
    
    * Rename commit to commit_transaction()
    
    * Update docs
    
    * Refresh in place
    
    * Remove redudant call
    
    * Load a fresh copy instead
    
    * Fix the docstrings
    
    * Restore CommitTableResponse
    
    * Conflicts
---
 python/mkdocs/docs/api.md            | 168 ++++--------------
 python/poetry.lock                   |  57 +++----
 python/pyiceberg/catalog/__init__.py |  23 ++-
 python/pyiceberg/catalog/dynamodb.py |  17 +-
 python/pyiceberg/catalog/glue.py     |  17 +-
 python/pyiceberg/catalog/hive.py     |  17 +-
 python/pyiceberg/catalog/noop.py     |  79 +++++++++
 python/pyiceberg/catalog/rest.py     |  74 +++++---
 python/pyiceberg/exceptions.py       |   8 +
 python/pyiceberg/table/__init__.py   | 318 ++++++++++++++++++++++++++++++++++-
 python/tests/catalog/test_base.py    |   7 +-
 python/tests/catalog/test_rest.py    |   8 +-
 python/tests/cli/test_console.py     |   9 +-
 python/tests/io/test_pyarrow.py      |   5 +
 python/tests/table/test_init.py      |   2 +
 python/tests/test_integration.py     |  52 ++++++
 16 files changed, 660 insertions(+), 201 deletions(-)

diff --git a/python/mkdocs/docs/api.md b/python/mkdocs/docs/api.md
index ddd5ca180f..d3b8fceee5 100644
--- a/python/mkdocs/docs/api.md
+++ b/python/mkdocs/docs/api.md
@@ -78,110 +78,15 @@ catalog.load_table(("nyc", "taxis"))
 # The tuple syntax can be used if the namespace or table contains a dot.
 ```
 
-This returns a `Table` that represents an Iceberg table:
-
-```python
-Table(
-  identifier=('nyc', 'taxis'),
-  
metadata_location='s3a://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json',
-  metadata=TableMetadataV2(
-    location='s3a://warehouse/wh/nyc.db/taxis',
-    table_uuid=UUID('ebd5d172-2162-453d-b586-1cdce52c1116'),
-    last_updated_ms=1662633437826,
-    last_column_id=19,
-    schemas=[Schema(
-        NestedField(field_id=1, name='VendorID', field_type=LongType(), 
required=False),
-        NestedField(field_id=2, name='tpep_pickup_datetime', 
field_type=TimestamptzType(), required=False),
-        NestedField(field_id=3, name='tpep_dropoff_datetime', 
field_type=TimestamptzType(), required=False),
-        NestedField(field_id=4, name='passenger_count', 
field_type=DoubleType(), required=False),
-        NestedField(field_id=5, name='trip_distance', field_type=DoubleType(), 
required=False),
-        NestedField(field_id=6, name='RatecodeID', field_type=DoubleType(), 
required=False),
-        NestedField(field_id=7, name='store_and_fwd_flag', 
field_type=StringType(), required=False),
-        NestedField(field_id=8, name='PULocationID', field_type=LongType(), 
required=False),
-        NestedField(field_id=9, name='DOLocationID', field_type=LongType(), 
required=False),
-        NestedField(field_id=10, name='payment_type', field_type=LongType(), 
required=False),
-        NestedField(field_id=11, name='fare_amount', field_type=DoubleType(), 
required=False),
-        NestedField(field_id=12, name='extra', field_type=DoubleType(), 
required=False),
-        NestedField(field_id=13, name='mta_tax', field_type=DoubleType(), 
required=False),
-        NestedField(field_id=14, name='tip_amount', field_type=DoubleType(), 
required=False),
-        NestedField(field_id=15, name='tolls_amount', field_type=DoubleType(), 
required=False),
-        NestedField(field_id=16, name='improvement_surcharge', 
field_type=DoubleType(), required=False),
-        NestedField(field_id=17, name='total_amount', field_type=DoubleType(), 
required=False),
-        NestedField(field_id=18, name='congestion_surcharge', 
field_type=DoubleType(), required=False),
-        NestedField(field_id=19, name='airport_fee', field_type=DoubleType(), 
required=False)
-      ),
-      schema_id=0,
-      identifier_field_ids=[]
-    )],
-    current_schema_id=0,
-    partition_specs=[PartitionSpec(spec_id=0)],
-    default_spec_id=0,
-    last_partition_id=999,
-    properties={
-      'owner': 'root',
-      'write.format.default': 'parquet'
-    },
-    current_snapshot_id=8334458494559715805,
-    snapshots=[
-      Snapshot(
-        snapshot_id=7910949481055846233,
-        parent_snapshot_id=None,
-        sequence_number=None,
-        timestamp_ms=1662489306555,
-        
manifest_list='s3a://warehouse/wh/nyc.db/taxis/metadata/snap-7910949481055846233-1-3eb7a2e1-5b7a-4e76-a29a-3e29c176eea4.avro',
-        summary=Summary(
-          Operation.APPEND,
-          **{
-            'spark.app.id': 'local-1662489289173',
-            'added-data-files': '1',
-            'added-records': '2979431',
-            'added-files-size': '46600777',
-            'changed-partition-count': '1',
-            'total-records': '2979431',
-            'total-files-size': '46600777',
-            'total-data-files': '1',
-            'total-delete-files': '0',
-            'total-position-deletes': '0',
-            'total-equality-deletes': '0'
-          }
-        ),
-        schema_id=0
-      ),
-    ],
-    snapshot_log=[
-      SnapshotLogEntry(
-        snapshot_id='7910949481055846233',
-        timestamp_ms=1662489306555
-      )
-    ],
-    metadata_log=[
-      MetadataLogEntry(
-        
metadata_file='s3a://warehouse/wh/nyc.db/taxis/metadata/00000-b58341ba-6a63-4eea-9b2f-e85e47c7d09f.metadata.json',
-        timestamp_ms=1662489306555
-      )
-    ],
-    sort_orders=[SortOrder(order_id=0)],
-    default_sort_order_id=0,
-    refs={
-      'main': SnapshotRef(
-        snapshot_id=8334458494559715805,
-        snapshot_ref_type=SnapshotRefType.BRANCH,
-        min_snapshots_to_keep=None,
-        max_snapshot_age_ms=None,
-        max_ref_age_ms=None
-      )
-    },
-    format_version=2,
-    last_sequence_number=1
-  )
-)
-```
+This returns a `Table` that represents an Iceberg table that can be queried 
and altered.
 
 ### Directly from a metadata file
 
 To load a table directly from a metadata file (i.e., **without** using a 
catalog), you can use a `StaticTable` as follows:
 
 ```python
+from pyiceberg.table import StaticTable
+
 table = StaticTable.from_metadata(
     
"s3a://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
 )
@@ -241,52 +146,37 @@ catalog.create_table(
 )
 ```
 
-Which returns a newly created table:
+### Update table properties
+
+Set and remove properties through the `Transaction` API:
 
 ```python
-Table(
-    identifier=('default', 'bids'),
-    
metadata_location='/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/bids//metadata/00000-c8cd93ab-f784-474d-a167-b1a86b05195f.metadata.json',
-    metadata=TableMetadataV2(
-        
location='/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/bids/',
-        table_uuid=UUID('38d4cb39-4945-4bf2-b374-984b5c4984d2'),
-        last_updated_ms=1661847562069,
-        last_column_id=4,
-        schemas=[
-            Schema(
-                NestedField(field_id=1, name='datetime', 
field_type=TimestampType(), required=False),
-                NestedField(field_id=2, name='bid', field_type=DoubleType(), 
required=False),
-                NestedField(field_id=3, name='ask', field_type=DoubleType(), 
required=False),
-                NestedField(field_id=4, name='symbol', 
field_type=StringType(), required=False)),
-                schema_id=1,
-                identifier_field_ids=[])
-        ],
-        current_schema_id=1,
-        partition_specs=[
-            PartitionSpec(
-                PartitionField(source_id=1, field_id=1000, 
transform=DayTransform(), name='datetime_day'),))
-        ],
-        default_spec_id=0,
-        last_partition_id=1000,
-        properties={},
-        current_snapshot_id=None,
-        snapshots=[],
-        snapshot_log=[],
-        metadata_log=[],
-        sort_orders=[
-            SortOrder(order_id=1, fields=[SortField(source_id=4, 
transform=IdentityTransform(), direction=SortDirection.ASC, 
null_order=NullOrder.NULLS_FIRST)])
-        ],
-        default_sort_order_id=1,
-        refs={},
-        format_version=2,
-        last_sequence_number=0
-    )
-)
+with table.transaction() as transaction:
+    transaction.set_properties(abc="def")
+
+assert table.properties == {"abc": "def"}
+
+with table.transaction() as transaction:
+    transaction.remove_properties("abc")
+
+assert table.properties == {}
+```
+
+Or, without a context manager:
+
+```python
+table = table.transaction().set_properties(abc="def").commit_transaction()
+
+assert table.properties == {"abc": "def"}
+
+table = table.transaction().remove_properties("abc").commit_transaction()
+
+assert table.properties == {}
 ```
 
-## Query a table
+## Query the data
 
-To query a table, a table scan is needed. A table scan accepts a filter, 
columns and optionally a limit and a snapshot ID:
+To query a table, a table scan is needed. A table scan accepts a filter, 
columns, optionally a limit and a snapshot ID:
 
 ```python
 from pyiceberg.catalog import load_catalog
diff --git a/python/poetry.lock b/python/poetry.lock
index 729f4fc5ea..74dbe1683b 100644
--- a/python/poetry.lock
+++ b/python/poetry.lock
@@ -1703,13 +1703,13 @@ files = [
 
 [[package]]
 name = "platformdirs"
-version = "3.6.0"
+version = "3.7.0"
 description = "A small Python package for determining appropriate 
platform-specific dirs, e.g. a \"user data dir\"."
 optional = false
 python-versions = ">=3.7"
 files = [
-    {file = "platformdirs-3.6.0-py3-none-any.whl", hash = 
"sha256:ffa199e3fbab8365778c4a10e1fbf1b9cd50707de826eb304b50e57ec0cc8d38"},
-    {file = "platformdirs-3.6.0.tar.gz", hash = 
"sha256:57e28820ca8094678b807ff529196506d7a21e17156cb1cddb3e74cebce54640"},
+    {file = "platformdirs-3.7.0-py3-none-any.whl", hash = 
"sha256:cfd065ba43133ff103ab3bd10aecb095c2a0035fcd1f07217c9376900d94ba07"},
+    {file = "platformdirs-3.7.0.tar.gz", hash = 
"sha256:87fbf6473e87c078d536980ba970a472422e94f17b752cfad17024c18876d481"},
 ]
 
 [package.extras]
@@ -1718,13 +1718,13 @@ test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", 
"pytest (>=7.3.1)", "pytest-
 
 [[package]]
 name = "pluggy"
-version = "1.0.0"
+version = "1.2.0"
 description = "plugin and hook calling mechanisms for python"
 optional = false
-python-versions = ">=3.6"
+python-versions = ">=3.7"
 files = [
-    {file = "pluggy-1.0.0-py2.py3-none-any.whl", hash = 
"sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3"},
-    {file = "pluggy-1.0.0.tar.gz", hash = 
"sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159"},
+    {file = "pluggy-1.2.0-py3-none-any.whl", hash = 
"sha256:c2fd55a7d7a3863cba1a013e4e2414658b1d07b6bc57b3919e0c63c9abb99849"},
+    {file = "pluggy-1.2.0.tar.gz", hash = 
"sha256:d12f0c4b579b15f5e054301bb226ee85eeeba08ffec228092f8defbaa3a4c4b3"},
 ]
 
 [package.extras]
@@ -2186,32 +2186,27 @@ files = [
 
 [[package]]
 name = "ray"
-version = "2.5.0"
+version = "2.5.1"
 description = "Ray provides a simple, universal API for building distributed 
applications."
 optional = true
 python-versions = "*"
 files = [
-    {file = "ray-2.5.0-cp310-cp310-macosx_10_15_universal2.whl", hash = 
"sha256:d1bebc874e896880c1215f4c1a11697ada49fa1595d6d99d7c5b4dc03030df36"},
-    {file = "ray-2.5.0-cp310-cp310-macosx_11_0_arm64.whl", hash = 
"sha256:c0285df2d24cacc36ca64b7852178a9bf37e3fc88545752fc2b46c27396965c1"},
-    {file = "ray-2.5.0-cp310-cp310-manylinux2014_aarch64.whl", hash = 
"sha256:38935d46c2597c1d1f113e1c8f88e2716c67052c480de5b2a0265e0a1a5ce88f"},
-    {file = "ray-2.5.0-cp310-cp310-manylinux2014_x86_64.whl", hash = 
"sha256:d53a07c9a9dbc134945a26980f557e9ff0f591bf8cabed1a6ebf921768d1c8bd"},
-    {file = "ray-2.5.0-cp310-cp310-win_amd64.whl", hash = 
"sha256:ef26ba24461dad98365b48ef01e27e70bc9737f4cf4734115804153d7d9195dc"},
-    {file = "ray-2.5.0-cp311-cp311-manylinux2014_aarch64.whl", hash = 
"sha256:d714175a5000ca91f82646a9b72521118bb6d2db5568e1b7ae9ceb64769716b6"},
-    {file = "ray-2.5.0-cp311-cp311-manylinux2014_x86_64.whl", hash = 
"sha256:0cde929e63497ed5f1c8626e5ccf7595ef6acaf1e7e270ad7c12f8e1c7695244"},
-    {file = "ray-2.5.0-cp37-cp37m-macosx_10_15_x86_64.whl", hash = 
"sha256:7e5512abf62c05c9ff90b1c89a4e0f2e45ee00e73f816eb8265e3ebd92fe4064"},
-    {file = "ray-2.5.0-cp37-cp37m-manylinux2014_aarch64.whl", hash = 
"sha256:3bf36beb213f89c0eb1ec5ac6ffddc8f53e616be745167f00ca017abd8672a2d"},
-    {file = "ray-2.5.0-cp37-cp37m-manylinux2014_x86_64.whl", hash = 
"sha256:59c2448b07f45d9a9d8e594bb5337bd35a5fea04e42cb4211a3346c2c0d066b0"},
-    {file = "ray-2.5.0-cp37-cp37m-win_amd64.whl", hash = 
"sha256:63008dd659d9ef25b0e20f0e1a285e8266e0af68b1178bca1b6ae43e49a68104"},
-    {file = "ray-2.5.0-cp38-cp38-macosx_10_15_x86_64.whl", hash = 
"sha256:e9464e93d6b72e0da69b9c5ab0501cc40f2db14801e22c6b97fa4e8039647892"},
-    {file = "ray-2.5.0-cp38-cp38-macosx_11_0_arm64.whl", hash = 
"sha256:7dc00fac119bfa1c2f8ac456d50a728346d6f2722fb7a21bf70841fc7476c285"},
-    {file = "ray-2.5.0-cp38-cp38-manylinux2014_aarch64.whl", hash = 
"sha256:d76051519bd4ae39fda4a87536978cafdebf2843c1c29a9f734c503d8ea676cd"},
-    {file = "ray-2.5.0-cp38-cp38-manylinux2014_x86_64.whl", hash = 
"sha256:9a8e06dc5d4201129c28b6768a971c474b82a23935b2e40461ffc7f1c2f4942a"},
-    {file = "ray-2.5.0-cp38-cp38-win_amd64.whl", hash = 
"sha256:849014b62ca50ff106b7a5d41430346e2762b1c4c803673af076209925b8f912"},
-    {file = "ray-2.5.0-cp39-cp39-macosx_10_15_x86_64.whl", hash = 
"sha256:a1b52c12a3349d8e37357df31438b6f1b12c7719ef41bdf5089fc7e78e8ab212"},
-    {file = "ray-2.5.0-cp39-cp39-macosx_11_0_arm64.whl", hash = 
"sha256:25f3d50c27c4c4756259d093d152381c6604bb96684a0cf43c55ddcc2eb73f79"},
-    {file = "ray-2.5.0-cp39-cp39-manylinux2014_aarch64.whl", hash = 
"sha256:1cb4f6ef9cfdb69d2ae582f357e977527944390e2f5cbbf51efd8252ed4c9a11"},
-    {file = "ray-2.5.0-cp39-cp39-manylinux2014_x86_64.whl", hash = 
"sha256:662cff303c086369a29283badcd7445b7f911874d8407b2c589b1ccbf6028d2e"},
-    {file = "ray-2.5.0-cp39-cp39-win_amd64.whl", hash = 
"sha256:a2cea10981dad7cfd187edf5e225a667eb114269afc5f2321b52113ef2d86123"},
+    {file = "ray-2.5.1-cp310-cp310-macosx_10_15_universal2.whl", hash = 
"sha256:43c1f3c662692bca6a03a7eb118e310c563ac64901b838c74ec1fe34c28dfce7"},
+    {file = "ray-2.5.1-cp310-cp310-macosx_11_0_arm64.whl", hash = 
"sha256:bd47d617fcfc969d8f22814ebe283f2e7d3de63b3d4d34b968d905b7055f85cd"},
+    {file = "ray-2.5.1-cp310-cp310-manylinux2014_x86_64.whl", hash = 
"sha256:75f2f91c80cf2cbfee630175bd0fff6ba109bcc054e378229134dd2736083dd7"},
+    {file = "ray-2.5.1-cp310-cp310-win_amd64.whl", hash = 
"sha256:03bc24f865da7ad94b142c5c12b3827a95505cc1968366c825990b183d54a797"},
+    {file = "ray-2.5.1-cp311-cp311-manylinux2014_x86_64.whl", hash = 
"sha256:8f561bb6067e5c10af4eb0f8c9185dedda5a0214fae4b187675632220dab5b69"},
+    {file = "ray-2.5.1-cp37-cp37m-macosx_10_15_x86_64.whl", hash = 
"sha256:2f3a3667b5f40ed984ddab9854f3a0e9d88e12b8c078dd1b7964a7276693570d"},
+    {file = "ray-2.5.1-cp37-cp37m-manylinux2014_x86_64.whl", hash = 
"sha256:9c4ede34d265fe52da74c8966e45c5d2f4c27a2d5fb68d4828bc3b428050ca39"},
+    {file = "ray-2.5.1-cp37-cp37m-win_amd64.whl", hash = 
"sha256:e507339b6bb0bb85433dbce4e073f125a6e1da1009658963de0eaeb7ef4b7a93"},
+    {file = "ray-2.5.1-cp38-cp38-macosx_10_15_x86_64.whl", hash = 
"sha256:4019a5000b23909c004dba4cc6b7457ffc798ace2db5489d2a2d7c37cb053323"},
+    {file = "ray-2.5.1-cp38-cp38-macosx_11_0_arm64.whl", hash = 
"sha256:0289d3b6ecc86a260e51d23f7b04875ace5f9716ffd9a5a9f0072120d0866c37"},
+    {file = "ray-2.5.1-cp38-cp38-manylinux2014_x86_64.whl", hash = 
"sha256:e7e91a93fced3695405d61849dff4efb8cec74f669b22c0276164136992f4528"},
+    {file = "ray-2.5.1-cp38-cp38-win_amd64.whl", hash = 
"sha256:0e92c41c9326378bf2cdab3ae401a6279cda496da58d7a60564999de5dedcd2f"},
+    {file = "ray-2.5.1-cp39-cp39-macosx_10_15_x86_64.whl", hash = 
"sha256:c7ca8df922d2a090c4f4a6e017e43e1da98b30e3ed086fc01f25624874c6b64a"},
+    {file = "ray-2.5.1-cp39-cp39-macosx_11_0_arm64.whl", hash = 
"sha256:3cbe4284ee952bd9379210f08a2917bc7e74efb6ec71bd4f156e282e031cdbec"},
+    {file = "ray-2.5.1-cp39-cp39-manylinux2014_x86_64.whl", hash = 
"sha256:a9b8bdcfce1646df2cdc89456bf6e1ff57091c433789e1fc34a8caf3a874b8c8"},
+    {file = "ray-2.5.1-cp39-cp39-win_amd64.whl", hash = 
"sha256:5d6689e95706ba7ba4d13f9e09aed1498cee5736e0b74c61331585a3c1b086bf"},
 ]
 
 [package.dependencies]
@@ -2239,8 +2234,8 @@ requests = "*"
 
 [package.extras]
 air = ["aiohttp (>=3.7)", "aiohttp-cors", "aiorwlock", "colorful", "fastapi", 
"fsspec", "gpustat (>=1.0.0)", "numpy (>=1.20)", "opencensus", "pandas", 
"pandas (>=1.3)", "prometheus-client (>=0.7.1)", "py-spy (>=0.2.0)", "pyarrow 
(>=6.0.1)", "pydantic", "requests", "smart-open", "starlette", "tensorboardX 
(>=1.9)", "uvicorn", "virtualenv (>=20.0.24,<20.21.1)"]
-all = ["aiohttp (>=3.7)", "aiohttp-cors", "aiorwlock", "colorful", "dm-tree", 
"fastapi", "fsspec", "gpustat (>=1.0.0)", "gymnasium (==0.26.3)", "kubernetes", 
"lz4", "numpy (>=1.20)", "opencensus", "opentelemetry-api", 
"opentelemetry-exporter-otlp", "opentelemetry-sdk", "pandas", "pandas (>=1.3)", 
"prometheus-client (>=0.7.1)", "py-spy (>=0.2.0)", "pyarrow (>=6.0.1)", 
"pydantic", "pyyaml", "ray-cpp (==2.5.0)", "requests", "rich", "scikit-image", 
"scipy", "smart-open", "starlette", "tensor [...]
-cpp = ["ray-cpp (==2.5.0)"]
+all = ["aiohttp (>=3.7)", "aiohttp-cors", "aiorwlock", "colorful", "dm-tree", 
"fastapi", "fsspec", "gpustat (>=1.0.0)", "gymnasium (==0.26.3)", "kubernetes", 
"lz4", "numpy (>=1.20)", "opencensus", "opentelemetry-api", 
"opentelemetry-exporter-otlp", "opentelemetry-sdk", "pandas", "pandas (>=1.3)", 
"prometheus-client (>=0.7.1)", "py-spy (>=0.2.0)", "pyarrow (>=6.0.1)", 
"pydantic", "pyyaml", "ray-cpp (==2.5.1)", "requests", "rich", "scikit-image", 
"scipy", "smart-open", "starlette", "tensor [...]
+cpp = ["ray-cpp (==2.5.1)"]
 data = ["fsspec", "numpy (>=1.20)", "pandas (>=1.3)", "pyarrow (>=6.0.1)"]
 default = ["aiohttp (>=3.7)", "aiohttp-cors", "colorful", "gpustat (>=1.0.0)", 
"opencensus", "prometheus-client (>=0.7.1)", "py-spy (>=0.2.0)", "pydantic", 
"requests", "smart-open", "virtualenv (>=20.0.24,<20.21.1)"]
 k8s = ["kubernetes", "urllib3"]
diff --git a/python/pyiceberg/catalog/__init__.py 
b/python/pyiceberg/catalog/__init__.py
index 490d9c2060..f1be22b99e 100644
--- a/python/pyiceberg/catalog/__init__.py
+++ b/python/pyiceberg/catalog/__init__.py
@@ -40,7 +40,12 @@ from pyiceberg.manifest import ManifestFile
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.serializers import ToOutputFile
-from pyiceberg.table import Table, TableMetadata
+from pyiceberg.table import (
+    CommitTableRequest,
+    CommitTableResponse,
+    Table,
+    TableMetadata,
+)
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
 from pyiceberg.typedef import (
     EMPTY_DICT,
@@ -322,6 +327,20 @@ class Catalog(ABC):
             NoSuchTableError: If a table with the name does not exist.
         """
 
+    @abstractmethod
+    def _commit_table(self, table_request: CommitTableRequest) -> 
CommitTableResponse:
+        """Updates one or more tables.
+
+        Args:
+            table_request (CommitTableRequest): The table requests to be 
carried out.
+
+        Returns:
+            CommitTableResponse: The updated metadata.
+
+        Raises:
+            NoSuchTableError: If a table with the given identifier does not 
exist.
+        """
+
     @abstractmethod
     def create_namespace(self, namespace: Union[str, Identifier], properties: 
Properties = EMPTY_DICT) -> None:
         """Create a namespace in the catalog.
@@ -392,7 +411,7 @@ class Catalog(ABC):
 
     @abstractmethod
     def update_namespace_properties(
-        self, namespace: Union[str, Identifier], removals: set[str] | None = 
None, updates: Properties = EMPTY_DICT
+        self, namespace: Union[str, Identifier], removals: Optional[Set[str]] 
= None, updates: Properties = EMPTY_DICT
     ) -> PropertiesUpdateSummary:
         """Removes provided property keys and updates properties for a 
namespace.
 
diff --git a/python/pyiceberg/catalog/dynamodb.py 
b/python/pyiceberg/catalog/dynamodb.py
index 642f6fbabf..d11ffb5d39 100644
--- a/python/pyiceberg/catalog/dynamodb.py
+++ b/python/pyiceberg/catalog/dynamodb.py
@@ -52,7 +52,7 @@ from pyiceberg.io import load_file_io
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import Table
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
 from pyiceberg.table.metadata import new_table_metadata
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
 from pyiceberg.typedef import EMPTY_DICT
@@ -168,6 +168,20 @@ class DynamoDbCatalog(Catalog):
 
         return self.load_table(identifier=identifier)
 
+    def _commit_table(self, table_request: CommitTableRequest) -> 
CommitTableResponse:
+        """Updates the table.
+
+        Args:
+            table_request (CommitTableRequest): The table requests to be 
carried out.
+
+        Returns:
+            CommitTableResponse: The updated metadata.
+
+        Raises:
+            NoSuchTableError: If a table with the given identifier does not 
exist.
+        """
+        raise NotImplementedError
+
     def load_table(self, identifier: Union[str, Identifier]) -> Table:
         """
         Loads the table's metadata and returns the table instance.
@@ -577,6 +591,7 @@ class DynamoDbCatalog(Catalog):
             metadata=metadata,
             metadata_location=metadata_location,
             io=self._load_file_io(metadata.properties, metadata_location),
+            catalog=self,
         )
 
 
diff --git a/python/pyiceberg/catalog/glue.py b/python/pyiceberg/catalog/glue.py
index 24c077615e..7e06f2e47f 100644
--- a/python/pyiceberg/catalog/glue.py
+++ b/python/pyiceberg/catalog/glue.py
@@ -51,7 +51,7 @@ from pyiceberg.io import load_file_io
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import Table
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
 from pyiceberg.table.metadata import new_table_metadata
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
 from pyiceberg.typedef import EMPTY_DICT
@@ -170,6 +170,7 @@ class GlueCatalog(Catalog):
             metadata=metadata,
             metadata_location=metadata_location,
             io=self._load_file_io(metadata.properties, metadata_location),
+            catalog=self,
         )
 
     def _create_glue_table(self, database_name: str, table_name: str, 
table_input: Dict[str, Any]) -> None:
@@ -224,6 +225,20 @@ class GlueCatalog(Catalog):
 
         return self.load_table(identifier=identifier)
 
+    def _commit_table(self, table_request: CommitTableRequest) -> 
CommitTableResponse:
+        """Updates the table.
+
+        Args:
+            table_request (CommitTableRequest): The table requests to be 
carried out.
+
+        Returns:
+            CommitTableResponse: The updated metadata.
+
+        Raises:
+            NoSuchTableError: If a table with the given identifier does not 
exist.
+        """
+        raise NotImplementedError
+
     def load_table(self, identifier: Union[str, Identifier]) -> Table:
         """Loads the table's metadata and returns the table instance.
 
diff --git a/python/pyiceberg/catalog/hive.py b/python/pyiceberg/catalog/hive.py
index b8c91796a2..839fb2a3d5 100644
--- a/python/pyiceberg/catalog/hive.py
+++ b/python/pyiceberg/catalog/hive.py
@@ -66,7 +66,7 @@ from pyiceberg.io import FileIO, load_file_io
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema, SchemaVisitor, visit
 from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import Table
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
 from pyiceberg.table.metadata import new_table_metadata
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
 from pyiceberg.typedef import EMPTY_DICT
@@ -243,6 +243,7 @@ class HiveCatalog(Catalog):
             metadata=metadata,
             metadata_location=metadata_location,
             io=self._load_file_io(metadata.properties, metadata_location),
+            catalog=self,
         )
 
     def create_table(
@@ -302,6 +303,20 @@ class HiveCatalog(Catalog):
 
         return self._convert_hive_into_iceberg(hive_table, io)
 
+    def _commit_table(self, table_request: CommitTableRequest) -> 
CommitTableResponse:
+        """Updates the table.
+
+        Args:
+            table_request (CommitTableRequest): The table requests to be 
carried out.
+
+        Returns:
+            CommitTableResponse: The updated metadata.
+
+        Raises:
+            NoSuchTableError: If a table with the given identifier does not 
exist.
+        """
+        raise NotImplementedError
+
     def load_table(self, identifier: Union[str, Identifier]) -> Table:
         """Loads the table's metadata and returns the table instance.
 
diff --git a/python/pyiceberg/catalog/noop.py b/python/pyiceberg/catalog/noop.py
new file mode 100644
index 0000000000..bb93772aa7
--- /dev/null
+++ b/python/pyiceberg/catalog/noop.py
@@ -0,0 +1,79 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+from typing import (
+    List,
+    Optional,
+    Set,
+    Union,
+)
+
+from pyiceberg.catalog import Catalog, PropertiesUpdateSummary
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.table import (
+    CommitTableRequest,
+    CommitTableResponse,
+    SortOrder,
+    Table,
+)
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER
+from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
+
+
+class NoopCatalog(Catalog):
+    def create_table(
+        self,
+        identifier: Union[str, Identifier],
+        schema: Schema,
+        location: Optional[str] = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> Table:
+        raise NotImplementedError
+
+    def load_table(self, identifier: Union[str, Identifier]) -> Table:
+        raise NotImplementedError
+
+    def drop_table(self, identifier: Union[str, Identifier]) -> None:
+        raise NotImplementedError
+
+    def rename_table(self, from_identifier: Union[str, Identifier], 
to_identifier: Union[str, Identifier]) -> Table:
+        raise NotImplementedError
+
+    def _commit_table(self, table_request: CommitTableRequest) -> 
CommitTableResponse:
+        raise NotImplementedError
+
+    def create_namespace(self, namespace: Union[str, Identifier], properties: 
Properties = EMPTY_DICT) -> None:
+        raise NotImplementedError
+
+    def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
+        raise NotImplementedError
+
+    def list_tables(self, namespace: Union[str, Identifier]) -> 
List[Identifier]:
+        raise NotImplementedError
+
+    def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> 
List[Identifier]:
+        raise NotImplementedError
+
+    def load_namespace_properties(self, namespace: Union[str, Identifier]) -> 
Properties:
+        raise NotImplementedError
+
+    def update_namespace_properties(
+        self, namespace: Union[str, Identifier], removals: Optional[Set[str]] 
= None, updates: Properties = EMPTY_DICT
+    ) -> PropertiesUpdateSummary:
+        raise NotImplementedError
diff --git a/python/pyiceberg/catalog/rest.py b/python/pyiceberg/catalog/rest.py
index 4c36390171..56b8d9316e 100644
--- a/python/pyiceberg/catalog/rest.py
+++ b/python/pyiceberg/catalog/rest.py
@@ -22,6 +22,7 @@ from typing import (
     Literal,
     Optional,
     Set,
+    Tuple,
     Type,
     Union,
 )
@@ -42,6 +43,8 @@ from pyiceberg.catalog import (
 from pyiceberg.exceptions import (
     AuthorizationExpiredError,
     BadRequestError,
+    CommitFailedException,
+    CommitStateUnknownException,
     ForbiddenError,
     NamespaceAlreadyExistsError,
     NoSuchNamespaceError,
@@ -55,7 +58,12 @@ from pyiceberg.exceptions import (
 )
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema
-from pyiceberg.table import Table, TableMetadata
+from pyiceberg.table import (
+    CommitTableRequest,
+    CommitTableResponse,
+    Table,
+    TableMetadata,
+)
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
 from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel
 
@@ -68,7 +76,7 @@ class Endpoints:
     create_namespace: str = "namespaces"
     load_namespace_metadata: str = "namespaces/{namespace}"
     drop_namespace: str = "namespaces/{namespace}"
-    update_properties: str = "namespaces/{namespace}/properties"
+    update_namespace_properties: str = "namespaces/{namespace}/properties"
     list_tables: str = "namespaces/{namespace}/tables"
     create_table: str = "namespaces/{namespace}/tables"
     load_table: str = "namespaces/{namespace}/tables/{table}"
@@ -393,6 +401,17 @@ class RestCatalog(Catalog):
 
         session.mount(self.uri, SigV4Adapter(**self.properties))
 
+    def _response_to_table(self, identifier_tuple: Tuple[str, ...], 
table_response: TableResponse) -> Table:
+        return Table(
+            identifier=(self.name,) + identifier_tuple if self.name else 
identifier_tuple,
+            metadata_location=table_response.metadata_location,
+            metadata=table_response.metadata,
+            io=self._load_file_io(
+                {**table_response.metadata.properties, 
**table_response.config}, table_response.metadata_location
+            ),
+            catalog=self,
+        )
+
     def create_table(
         self,
         identifier: Union[str, Identifier],
@@ -422,15 +441,7 @@ class RestCatalog(Catalog):
             self._handle_non_200_response(exc, {409: TableAlreadyExistsError})
 
         table_response = TableResponse(**response.json())
-
-        return Table(
-            identifier=(self.name,) + self.identifier_to_tuple(identifier),
-            metadata_location=table_response.metadata_location,
-            metadata=table_response.metadata,
-            io=self._load_file_io(
-                {**table_response.metadata.properties, 
**table_response.config}, table_response.metadata_location
-            ),
-        )
+        return self._response_to_table(self.identifier_to_tuple(identifier), 
table_response)
 
     def list_tables(self, namespace: Union[str, Identifier]) -> 
List[Identifier]:
         namespace_tuple = self._check_valid_namespace_identifier(namespace)
@@ -455,14 +466,7 @@ class RestCatalog(Catalog):
             self._handle_non_200_response(exc, {404: NoSuchTableError})
 
         table_response = TableResponse(**response.json())
-        return Table(
-            identifier=(self.name,) + identifier_tuple if self.name else 
identifier_tuple,
-            metadata_location=table_response.metadata_location,
-            metadata=table_response.metadata,
-            io=self._load_file_io(
-                {**table_response.metadata.properties, 
**table_response.config}, table_response.metadata_location
-            ),
-        )
+        return self._response_to_table(identifier_tuple, table_response)
 
     def drop_table(self, identifier: Union[str, Identifier], purge_requested: 
bool = False) -> None:
         response = self._session.delete(
@@ -489,6 +493,36 @@ class RestCatalog(Catalog):
 
         return self.load_table(to_identifier)
 
+    def _commit_table(self, table_request: CommitTableRequest) -> 
CommitTableResponse:
+        """Updates the table.
+
+        Args:
+            table_request (CommitTableRequest): The table requests to be 
carried out.
+
+        Returns:
+            CommitTableResponse: The updated metadata.
+
+        Raises:
+            NoSuchTableError: If a table with the given identifier does not 
exist.
+        """
+        response = self._session.post(
+            self.url(Endpoints.update_table, prefixed=True, 
**self._split_identifier_for_path(table_request.identifier)),
+            data=table_request.json(),
+        )
+        try:
+            response.raise_for_status()
+        except HTTPError as exc:
+            self._handle_non_200_response(
+                exc,
+                {
+                    409: CommitFailedException,
+                    500: CommitStateUnknownException,
+                    502: CommitStateUnknownException,
+                    504: CommitStateUnknownException,
+                },
+            )
+        return CommitTableResponse(**response.json())
+
     def create_namespace(self, namespace: Union[str, Identifier], properties: 
Properties = EMPTY_DICT) -> None:
         namespace_tuple = self._check_valid_namespace_identifier(namespace)
         payload = {"namespace": namespace_tuple, "properties": properties}
@@ -541,7 +575,7 @@ class RestCatalog(Catalog):
         namespace_tuple = self._check_valid_namespace_identifier(namespace)
         namespace = NAMESPACE_SEPARATOR.join(namespace_tuple)
         payload = {"removals": list(removals or []), "updates": updates}
-        response = self._session.post(self.url(Endpoints.update_properties, 
namespace=namespace), json=payload)
+        response = 
self._session.post(self.url(Endpoints.update_namespace_properties, 
namespace=namespace), json=payload)
         try:
             response.raise_for_status()
         except HTTPError as exc:
diff --git a/python/pyiceberg/exceptions.py b/python/pyiceberg/exceptions.py
index 65b20caa88..f555543723 100644
--- a/python/pyiceberg/exceptions.py
+++ b/python/pyiceberg/exceptions.py
@@ -102,3 +102,11 @@ class ConditionalCheckFailedException(DynamoDbError):
 
 class GenericDynamoDbError(DynamoDbError):
     pass
+
+
+class CommitFailedException(RESTError):
+    """Commit failed, refresh and try again."""
+
+
+class CommitStateUnknownException(RESTError):
+    """Commit failed due to unknown reason."""
diff --git a/python/pyiceberg/table/__init__.py 
b/python/pyiceberg/table/__init__.py
index 9c095a5319..0d433a65c6 100644
--- a/python/pyiceberg/table/__init__.py
+++ b/python/pyiceberg/table/__init__.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 from abc import ABC, abstractmethod
 from dataclasses import dataclass
+from enum import Enum
 from functools import cached_property
 from itertools import chain
 from multiprocessing.pool import ThreadPool
@@ -28,6 +29,7 @@ from typing import (
     Dict,
     Iterable,
     List,
+    Literal,
     Optional,
     Set,
     Tuple,
@@ -63,6 +65,7 @@ from pyiceberg.table.snapshots import Snapshot, 
SnapshotLogEntry
 from pyiceberg.table.sorting import SortOrder
 from pyiceberg.typedef import (
     EMPTY_DICT,
+    IcebergBaseModel,
     Identifier,
     KeyDefaultDict,
     Properties,
@@ -74,24 +77,327 @@ if TYPE_CHECKING:
     import ray
     from duckdb import DuckDBPyConnection
 
+    from pyiceberg.catalog import Catalog
+
+
 ALWAYS_TRUE = AlwaysTrue()
 
 
+class Transaction:
+    _table: Table
+    _updates: Tuple[TableUpdate, ...]
+    _requirements: Tuple[TableRequirement, ...]
+
+    def __init__(
+        self,
+        table: Table,
+        actions: Optional[Tuple[TableUpdate, ...]] = None,
+        requirements: Optional[Tuple[TableRequirement, ...]] = None,
+    ):
+        self._table = table
+        self._updates = actions or ()
+        self._requirements = requirements or ()
+
+    def __enter__(self) -> Transaction:
+        return self
+
+    def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
+        fresh_table = self.commit_transaction()
+        # Update the new data in place
+        self._table.metadata = fresh_table.metadata
+        self._table.metadata_location = fresh_table.metadata_location
+
+    def _append_updates(self, *new_updates: TableUpdate) -> Transaction:
+        """Appends updates to the set of staged updates.
+
+        Args:
+            *new_updates: Any new updates.
+
+        Raises:
+            ValueError: When the type of update is not unique.
+
+        Returns:
+            A new AlterTable object with the new updates appended.
+        """
+        for new_update in new_updates:
+            type_new_update = type(new_update)
+            if any(type(update) == type_new_update for update in 
self._updates):
+                raise ValueError(f"Updates in a single commit need to be 
unique, duplicate: {type_new_update}")
+        self._updates = self._updates + new_updates
+        return self
+
+    def set_table_version(self, format_version: Literal[1, 2]) -> Transaction:
+        """Sets the table to a certain version.
+
+        Args:
+            format_version: The newly set version.
+
+        Returns:
+            The alter table builder.
+        """
+        raise NotImplementedError("Not yet implemented")
+
+    def set_properties(self, **updates: str) -> Transaction:
+        """Set properties.
+
+        When a property is already set, it will be overwritten.
+
+        Args:
+            updates: The properties set on the table.
+
+        Returns:
+            The alter table builder.
+        """
+        return self._append_updates(SetPropertiesUpdate(updates=updates))
+
+    def remove_properties(self, *removals: str) -> Transaction:
+        """Removes properties.
+
+        Args:
+            removals: Properties to be removed.
+
+        Returns:
+            The alter table builder.
+        """
+        return self._append_updates(RemovePropertiesUpdate(removals=removals))
+
+    def update_location(self, location: str) -> Transaction:
+        """Sets the new table location.
+
+        Args:
+            location: The new location of the table.
+
+        Returns:
+            The alter table builder.
+        """
+        raise NotImplementedError("Not yet implemented")
+
+    def commit_transaction(self) -> Table:
+        """Commits the changes to the catalog.
+
+        Returns:
+            The table with the updates applied.
+        """
+        # Strip the catalog name
+        if len(self._updates) > 0:
+            response = self._table.catalog._commit_table(  # pylint: 
disable=W0212
+                CommitTableRequest(
+                    identifier=self._table.identifier[1:],
+                    requirements=self._requirements,
+                    updates=self._updates,
+                )
+            )
+            # Update the metadata with the new one
+            self._table.metadata = response.metadata
+            self._table.metadata_location = response.metadata_location
+
+            return self._table
+        else:
+            return self._table
+
+
+class TableUpdateAction(Enum):
+    upgrade_format_version = "upgrade-format-version"
+    add_schema = "add-schema"
+    set_current_schema = "set-current-schema"
+    add_spec = "add-spec"
+    set_default_spec = "set-default-spec"
+    add_sort_order = "add-sort-order"
+    set_default_sort_order = "set-default-sort-order"
+    add_snapshot = "add-snapshot"
+    set_snapshot_ref = "set-snapshot-ref"
+    remove_snapshots = "remove-snapshots"
+    remove_snapshot_ref = "remove-snapshot-ref"
+    set_location = "set-location"
+    set_properties = "set-properties"
+    remove_properties = "remove-properties"
+
+
+class TableUpdate(IcebergBaseModel):
+    action: TableUpdateAction
+
+
+class UpgradeFormatVersionUpdate(TableUpdate):
+    action = TableUpdateAction.upgrade_format_version
+    format_version: int = Field(alias="format-version")
+
+
+class AddSchemaUpdate(TableUpdate):
+    action = TableUpdateAction.add_schema
+    schema_: Schema = Field(alias="schema")
+
+
+class SetCurrentSchemaUpdate(TableUpdate):
+    action = TableUpdateAction.set_current_schema
+    schema_id: int = Field(
+        alias="schema-id", description="Schema ID to set as current, or -1 to 
set last added schema", default=-1
+    )
+
+
+class AddPartitionSpecUpdate(TableUpdate):
+    action = TableUpdateAction.add_spec
+    spec: PartitionSpec
+
+
+class SetDefaultSpecUpdate(TableUpdate):
+    action = TableUpdateAction.set_default_spec
+    spec_id: int = Field(
+        alias="spec-id", description="Partition spec ID to set as the default, 
or -1 to set last added spec", default=-1
+    )
+
+
+class AddSortOrderUpdate(TableUpdate):
+    action = TableUpdateAction.add_sort_order
+    sort_order: SortOrder = Field(alias="sort-order")
+
+
+class SetDefaultSortOrderUpdate(TableUpdate):
+    action = TableUpdateAction.set_default_sort_order
+    sort_order_id: int = Field(
+        alias="sort-order-id", description="Sort order ID to set as the 
default, or -1 to set last added sort order", default=-1
+    )
+
+
+class AddSnapshotUpdate(TableUpdate):
+    action = TableUpdateAction.add_snapshot
+    snapshot: Snapshot
+
+
+class SetSnapshotRefUpdate(TableUpdate):
+    action = TableUpdateAction.set_snapshot_ref
+    ref_name: str = Field(alias="ref-name")
+    type: Literal["tag", "branch"]
+    snapshot_id: int = Field(alias="snapshot-id")
+    max_age_ref_ms: int = Field(alias="max-ref-age-ms")
+    max_snapshot_age_ms: int = Field(alias="max-snapshot-age-ms")
+    min_snapshots_to_keep: int = Field(alias="min-snapshots-to-keep")
+
+
+class RemoveSnapshotsUpdate(TableUpdate):
+    action = TableUpdateAction.remove_snapshots
+    snapshot_ids: List[int] = Field(alias="snapshot-ids")
+
+
+class RemoveSnapshotRefUpdate(TableUpdate):
+    action = TableUpdateAction.remove_snapshot_ref
+    ref_name: str = Field(alias="ref-name")
+
+
+class SetLocationUpdate(TableUpdate):
+    action = TableUpdateAction.set_location
+    location: str
+
+
+class SetPropertiesUpdate(TableUpdate):
+    action = TableUpdateAction.set_properties
+    updates: Dict[str, str]
+
+
+class RemovePropertiesUpdate(TableUpdate):
+    action = TableUpdateAction.remove_properties
+    removals: List[str]
+
+
+class TableRequirement(IcebergBaseModel):
+    type: str
+
+
+class AssertCreate(TableRequirement):
+    """The table must not already exist; used for create transactions."""
+
+    type: Literal["assert-create"]
+
+
+class AssertTableUUID(TableRequirement):
+    """The table UUID must match the requirement's `uuid`."""
+
+    type: Literal["assert-table-uuid"]
+    uuid: str
+
+
+class AssertRefSnapshotId(TableRequirement):
+    """The table branch or tag identified by the requirement's `ref` must 
reference the requirement's `snapshot-id`.
+
+    if `snapshot-id` is `null` or missing, the ref must not already exist.
+    """
+
+    type: Literal["assert-ref-snapshot-id"]
+    ref: str
+    snapshot_id: int = Field(..., alias="snapshot-id")
+
+
+class AssertLastAssignedFieldId(TableRequirement):
+    """The table's last assigned column id must match the requirement's 
`last-assigned-field-id`."""
+
+    type: Literal["assert-last-assigned-field-id"]
+    last_assigned_field_id: int = Field(..., alias="last-assigned-field-id")
+
+
+class AssertCurrentSchemaId(TableRequirement):
+    """The table's current schema id must match the requirement's 
`current-schema-id`."""
+
+    type: Literal["assert-current-schema-id"]
+    current_schema_id: int = Field(..., alias="current-schema-id")
+
+
+class AssertLastAssignedPartitionId(TableRequirement):
+    """The table's last assigned partition id must match the requirement's 
`last-assigned-partition-id`."""
+
+    type: Literal["assert-last-assigned-partition-id"]
+    last_assigned_partition_id: int = Field(..., 
alias="last-assigned-partition-id")
+
+
+class AssertDefaultSpecId(TableRequirement):
+    """The table's default spec id must match the requirement's 
`default-spec-id`."""
+
+    type: Literal["assert-default-spec-id"]
+    default_spec_id: int = Field(..., alias="default-spec-id")
+
+
+class AssertDefaultSortOrderId(TableRequirement):
+    """The table's default sort order id must match the requirement's 
`default-sort-order-id`."""
+
+    type: Literal["assert-default-sort-order-id"]
+    default_sort_order_id: int = Field(..., alias="default-sort-order-id")
+
+
+class CommitTableRequest(IcebergBaseModel):
+    identifier: Identifier = Field()
+    requirements: List[TableRequirement] = Field(default_factory=list)
+    updates: List[TableUpdate] = Field(default_factory=list)
+
+
+class CommitTableResponse(IcebergBaseModel):
+    metadata: TableMetadata = Field()
+    metadata_location: str = Field(alias="metadata-location")
+
+
 class Table:
     identifier: Identifier = Field()
     metadata: TableMetadata = Field()
     metadata_location: str = Field()
     io: FileIO
+    catalog: Catalog
 
-    def __init__(self, identifier: Identifier, metadata: TableMetadata, 
metadata_location: str, io: FileIO) -> None:
+    def __init__(
+        self, identifier: Identifier, metadata: TableMetadata, 
metadata_location: str, io: FileIO, catalog: Catalog
+    ) -> None:
         self.identifier = identifier
         self.metadata = metadata
         self.metadata_location = metadata_location
         self.io = io
+        self.catalog = catalog
+
+    def transaction(self) -> Transaction:
+        return Transaction(self)
 
     def refresh(self) -> Table:
         """Refresh the current table metadata."""
-        raise NotImplementedError("To be implemented")
+        fresh = self.catalog.load_table(self.identifier[1:])
+        self.metadata = fresh.metadata
+        self.io = fresh.io
+        self.metadata_location = fresh.metadata_location
+        return self
 
     def name(self) -> Identifier:
         """Return the identifier of this table."""
@@ -142,6 +448,11 @@ class Table:
         """Return a dict of the sort orders of this table."""
         return {sort_order.order_id: sort_order for sort_order in 
self.metadata.sort_orders}
 
+    @property
+    def properties(self) -> Dict[str, str]:
+        """Properties of the table."""
+        return self.metadata.properties
+
     def location(self) -> str:
         """Return the table's base location."""
         return self.metadata.location
@@ -195,11 +506,14 @@ class StaticTable(Table):
 
         metadata = FromInputFile.table_metadata(file)
 
+        from pyiceberg.catalog.noop import NoopCatalog
+
         return cls(
             identifier=("static-table", metadata_location),
             metadata_location=metadata_location,
             metadata=metadata,
             io=load_file_io({**properties, **metadata.properties}),
+            catalog=NoopCatalog("static-table"),
         )
 
 
diff --git a/python/tests/catalog/test_base.py 
b/python/tests/catalog/test_base.py
index 742549a654..29b63e0900 100644
--- a/python/tests/catalog/test_base.py
+++ b/python/tests/catalog/test_base.py
@@ -42,7 +42,7 @@ from pyiceberg.exceptions import (
 from pyiceberg.io import load_file_io
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, 
PartitionField, PartitionSpec
 from pyiceberg.schema import Schema
-from pyiceberg.table import Table
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
 from pyiceberg.table.metadata import TableMetadataV1
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
 from pyiceberg.transforms import IdentityTransform
@@ -103,10 +103,14 @@ class InMemoryCatalog(Catalog):
                 ),
                 
metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json',
                 io=load_file_io(),
+                catalog=self,
             )
             self.__tables[identifier] = table
             return table
 
+    def _commit_table(self, table_request: CommitTableRequest) -> 
CommitTableResponse:
+        raise NotImplementedError
+
     def load_table(self, identifier: Union[str, Identifier]) -> Table:
         identifier = Catalog.identifier_to_tuple(identifier)
         try:
@@ -141,6 +145,7 @@ class InMemoryCatalog(Catalog):
             metadata=table.metadata,
             metadata_location=table.metadata_location,
             io=load_file_io(),
+            catalog=self,
         )
         return self.__tables[to_identifier]
 
diff --git a/python/tests/catalog/test_rest.py 
b/python/tests/catalog/test_rest.py
index 6f3cafffc1..a7663ac511 100644
--- a/python/tests/catalog/test_rest.py
+++ b/python/tests/catalog/test_rest.py
@@ -408,7 +408,8 @@ def test_load_table_200(rest_mock: Mocker) -> None:
         status_code=200,
         request_headers=TEST_HEADERS,
     )
-    actual = RestCatalog("rest", uri=TEST_URI, 
token=TEST_TOKEN).load_table(("fokko", "table"))
+    catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
+    actual = catalog.load_table(("fokko", "table"))
     expected = Table(
         identifier=("rest", "fokko", "table"),
         
metadata_location="s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
@@ -484,6 +485,7 @@ def test_load_table_200(rest_mock: Mocker) -> None:
             partition_spec=[],
         ),
         io=load_file_io(),
+        catalog=catalog,
     )
     assert actual == expected
 
@@ -585,7 +587,8 @@ def test_create_table_200(rest_mock: Mocker, 
table_schema_simple: Schema) -> Non
         status_code=200,
         request_headers=TEST_HEADERS,
     )
-    table = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).create_table(
+    catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
+    table = catalog.create_table(
         identifier=("fokko", "fokko2"),
         schema=table_schema_simple,
         location=None,
@@ -639,6 +642,7 @@ def test_create_table_200(rest_mock: Mocker, 
table_schema_simple: Schema) -> Non
             partition_spec=[],
         ),
         io=load_file_io(),
+        catalog=catalog,
     )
 
 
diff --git a/python/tests/cli/test_console.py b/python/tests/cli/test_console.py
index 555071dd0d..4aaec26a78 100644
--- a/python/tests/cli/test_console.py
+++ b/python/tests/cli/test_console.py
@@ -26,12 +26,13 @@ from unittest import mock
 from click.testing import CliRunner
 
 from pyiceberg.catalog import Catalog, PropertiesUpdateSummary
+from pyiceberg.catalog.noop import NoopCatalog
 from pyiceberg.cli.console import run
 from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError
 from pyiceberg.io import load_file_io
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema
-from pyiceberg.table import Table
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
 from pyiceberg.table.metadata import TableMetadataV2
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
 from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -115,8 +116,12 @@ class MockCatalog(Catalog):
             metadata_location="s3://tmp/",
             metadata=TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2),
             io=load_file_io(),
+            catalog=self,
         )
 
+    def _commit_table(self, table_request: CommitTableRequest) -> 
CommitTableResponse:
+        raise NotImplementedError
+
     def load_table(self, identifier: Union[str, Identifier]) -> Table:
         tuple_identifier = Catalog.identifier_to_tuple(identifier)
         if tuple_identifier == ("default", "foo"):
@@ -125,6 +130,7 @@ class MockCatalog(Catalog):
                 metadata_location="s3://tmp/",
                 metadata=TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2),
                 io=load_file_io(),
+                catalog=NoopCatalog("NoopCatalog"),
             )
         else:
             raise NoSuchTableError(f"Table does not exist: 
{'.'.join(tuple_identifier)}")
@@ -147,6 +153,7 @@ class MockCatalog(Catalog):
                 metadata_location="s3://tmp/",
                 metadata=TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2),
                 io=load_file_io(),
+                catalog=NoopCatalog("NoopCatalog"),
             )
         else:
             raise NoSuchTableError(f"Table does not exist: {from_identifier}")
diff --git a/python/tests/io/test_pyarrow.py b/python/tests/io/test_pyarrow.py
index 3792aea03e..7a07532c66 100644
--- a/python/tests/io/test_pyarrow.py
+++ b/python/tests/io/test_pyarrow.py
@@ -27,6 +27,7 @@ import pytest
 from pyarrow.fs import FileType, LocalFileSystem
 
 from pyiceberg.avro.resolver import ResolveError
+from pyiceberg.catalog.noop import NoopCatalog
 from pyiceberg.expressions import (
     AlwaysFalse,
     AlwaysTrue,
@@ -821,6 +822,7 @@ def project(
             ),
             metadata_location="file://a/b/c.json",
             io=PyArrowFileIO(),
+            catalog=NoopCatalog("NoopCatalog"),
         ),
         expr or AlwaysTrue(),
         schema,
@@ -1232,6 +1234,7 @@ def test_delete(deletes_file: str, example_task: 
FileScanTask, table_schema_simp
             ),
             metadata_location=metadata_location,
             io=load_file_io(),
+            catalog=NoopCatalog("noop"),
         ),
         row_filter=AlwaysTrue(),
         projected_schema=table_schema_simple,
@@ -1274,6 +1277,7 @@ def test_delete_duplicates(deletes_file: str, 
example_task: FileScanTask, table_
             ),
             metadata_location=metadata_location,
             io=load_file_io(),
+            catalog=NoopCatalog("noop"),
         ),
         row_filter=AlwaysTrue(),
         projected_schema=table_schema_simple,
@@ -1308,6 +1312,7 @@ def test_pyarrow_wrap_fsspec(example_task: FileScanTask, 
table_schema_simple: Sc
             ),
             metadata_location=metadata_location,
             io=load_file_io(properties={"py-io-impl": 
"pyiceberg.io.fsspec.FsspecFileIO"}, location=metadata_location),
+            catalog=NoopCatalog("NoopCatalog"),
         ),
         case_sensitive=True,
         projected_schema=table_schema_simple,
diff --git a/python/tests/table/test_init.py b/python/tests/table/test_init.py
index 36a518076b..b421f81493 100644
--- a/python/tests/table/test_init.py
+++ b/python/tests/table/test_init.py
@@ -20,6 +20,7 @@ from typing import Any, Dict
 import pytest
 from sortedcontainers import SortedList
 
+from pyiceberg.catalog.noop import NoopCatalog
 from pyiceberg.expressions import (
     AlwaysTrue,
     And,
@@ -62,6 +63,7 @@ def table(example_table_metadata_v2: Dict[str, Any]) -> Table:
         metadata=table_metadata,
         metadata_location=f"{table_metadata.location}/uuid.metadata.json",
         io=load_file_io(),
+        catalog=NoopCatalog("NoopCatalog"),
     )
 
 
diff --git a/python/tests/test_integration.py b/python/tests/test_integration.py
index 14b3a36bda..3609920819 100644
--- a/python/tests/test_integration.py
+++ b/python/tests/test_integration.py
@@ -24,6 +24,7 @@ import pytest
 from pyarrow.fs import S3FileSystem
 
 from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.exceptions import NoSuchTableError
 from pyiceberg.expressions import (
     And,
     GreaterThanOrEqual,
@@ -34,6 +35,13 @@ from pyiceberg.expressions import (
 from pyiceberg.io.pyarrow import pyarrow_to_schema
 from pyiceberg.schema import Schema
 from pyiceberg.table import Table
+from pyiceberg.types import (
+    BooleanType,
+    IntegerType,
+    NestedField,
+    StringType,
+    TimestampType,
+)
 
 
 @pytest.fixture()
@@ -70,6 +78,50 @@ def table_test_all_types(catalog: Catalog) -> Table:
     return catalog.load_table("default.test_all_types")
 
 
+TABLE_NAME = ("default", "t1")
+
+
[email protected]()
+def table(catalog: Catalog) -> Table:
+    try:
+        catalog.drop_table(TABLE_NAME)
+    except NoSuchTableError:
+        pass  # Just to make sure that the table doesn't exist
+
+    schema = Schema(
+        NestedField(field_id=1, name="str", field_type=StringType(), 
required=False),
+        NestedField(field_id=2, name="int", field_type=IntegerType(), 
required=True),
+        NestedField(field_id=3, name="bool", field_type=BooleanType(), 
required=False),
+        NestedField(field_id=4, name="datetime", field_type=TimestampType(), 
required=False),
+        schema_id=1,
+    )
+
+    return catalog.create_table(identifier=TABLE_NAME, schema=schema)
+
+
[email protected]
+def test_table_properties(table: Table) -> None:
+    assert table.properties == {}
+
+    with table.transaction() as transaction:
+        transaction.set_properties(abc="🤪")
+
+    assert table.properties == {"abc": "🤪"}
+
+    with table.transaction() as transaction:
+        transaction.remove_properties("abc")
+
+    assert table.properties == {}
+
+    table = table.transaction().set_properties(abc="def").commit_transaction()
+
+    assert table.properties == {"abc": "def"}
+
+    table = table.transaction().remove_properties("abc").commit_transaction()
+
+    assert table.properties == {}
+
+
 @pytest.fixture()
 def test_positional_mor_deletes(catalog: Catalog) -> Table:
     """Table that has positional deletes"""

Reply via email to