amoghrajesh commented on code in PR #68133:
URL: https://github.com/apache/airflow/pull/68133#discussion_r3400606504


##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -3239,6 +3239,21 @@ state_store:
       type: integer
       example: "10000"
       default: "0"
+    max_value_storage_bytes:
+      description: |
+        Only applicable to MetastoreStoreBackend. Maximum size in bytes that a 
single task or asset store
+        value written via the core or execution API can have. Values that 
exceed this limit are rejected
+        at the API boundary.
+
+        Workers writing via the execution API log a warning to use custom 
backends when this limit
+        is exceeded but the write is allowed to avoid interrupting a task mid 
execution.
+
+        The default of 65535 bytes (64 KB) is appropriate for coordination 
state values such as
+        job IDs, cursors, and small status maps. For larger payloads, use a 
custom state backends.
+      version_added: 3.3.0
+      type: integer
+      example: "1048576"
+      default: "65535"

Review Comment:
   Handled in [comments from 
kaxil](https://github.com/apache/airflow/pull/68133/commits/4adbfb1701aaf1d7618e06cfdc92d6f6e4e7ec83)



##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py:
##########
@@ -305,6 +305,12 @@ def test_put_overwrites_expiry_on_existing_key(self, 
test_client, time_machine):
         assert resp["value"] == "v2"
         assert resp["expires_at"] is None
 
+    def test_put_value_over_limit_returns_422(self, test_client):
+        # default is 65536 bytes

Review Comment:
   Handled in [comments from 
kaxil](https://github.com/apache/airflow/pull/68133/commits/4adbfb1701aaf1d7618e06cfdc92d6f6e4e7ec83)



##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -600,7 +601,21 @@ def set(self, key: str, value: JsonValue, *, retention: 
timedelta | None = None)
             # wrap the value with a marker to indicate that it's stored 
externally, and include the ref to the external storage
             stored = _wrap_external_ref(ref)
 
-        SUPERVISOR_COMMS.send(SetTaskStore(ti_id=self._ti_id, key=key, 
value=stored, expires_at=expires_at))
+        msg = SetTaskStore(ti_id=self._ti_id, key=key, value=stored, 
expires_at=expires_at)
+
+        limit = conf.getint("state_store", "max_value_storage_bytes")
+        if limit > 0:
+            serialized_size = len(json.dumps(stored))
+            if serialized_size > limit:
+                log.warning(
+                    "Task store value for key %r is %d bytes, which exceeds 
configured max_value_storage_bytes=%d. "
+                    "Consider using a custom [state_store] backend to offload 
large payloads.",

Review Comment:
   Handled in [comments from 
kaxil](https://github.com/apache/airflow/pull/68133/commits/4adbfb1701aaf1d7618e06cfdc92d6f6e4e7ec83)



##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_store.py:
##########
@@ -66,8 +65,12 @@ def value_is_json_representable(cls, v: JsonValue) -> 
JsonValue:
             serialized = json.dumps(v, allow_nan=False)
         except ValueError:
             raise ValueError("value contains non-finite numbers; NaN and Inf 
are not JSON representable")
-        if len(serialized) > _MAX_SERIALIZED_BYTES:
-            raise ValueError(f"value exceeds maximum serialized size of 
{_MAX_SERIALIZED_BYTES} bytes")
+        limit = conf.getint("state_store", "max_value_storage_bytes")
+        if limit > 0 and len(serialized) > limit:
+            raise ValueError(
+                f"value exceeds max_value_storage_bytes ({limit}); "
+                "for large payloads configure a custom [state_store] backend"

Review Comment:
   Handled in [comments from 
kaxil](https://github.com/apache/airflow/pull/68133/commits/4adbfb1701aaf1d7618e06cfdc92d6f6e4e7ec83)



##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -3239,6 +3239,21 @@ state_store:
       type: integer
       example: "10000"
       default: "0"
+    max_value_storage_bytes:
+      description: |
+        Only applicable to MetastoreStoreBackend. Maximum size in bytes that a 
single task or asset store
+        value written via the core or execution API can have. Values that 
exceed this limit are rejected

Review Comment:
   Handled in [comments from 
kaxil](https://github.com/apache/airflow/pull/68133/commits/4adbfb1701aaf1d7618e06cfdc92d6f6e4e7ec83)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to