This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5f5fc1f4842 Add LRU cache eviction to CachingStateProvider (#37214)
5f5fc1f4842 is described below
commit 5f5fc1f4842f819f389858777efdc561514b606f
Author: M Junaid Shaukat <[email protected]>
AuthorDate: Fri Jan 16 20:01:06 2026 +0500
Add LRU cache eviction to CachingStateProvider (#37214)
* Add LRU cache eviction to CachingStateProvider
Fixes #37213
Implements LRU (Least Recently Used) cache eviction to prevent
unbounded memory growth in long-running workers. Adds configurable
maxCacheSize parameter (default: 1000 entries) and maintains LRU
order using JavaScript Map's insertion order.
- Add maxCacheSize constructor parameter with default value of 1000
- Implement evictIfNeeded() to remove oldest entry when cache is full
- Implement touchCacheEntry() to move accessed items to end (LRU)
- Add comprehensive test coverage in state_provider_test.ts
This addresses the TODO comment in the code and improves reliability
for production workloads.
* Address review comments: size-based LRU eviction for CachingStateProvider
- Fixed bug: removed incorrect evictIfNeeded() call in promise callback
- Removed unnecessary this_ variable (arrow functions capture this)
- Changed from count-based to size-based eviction (similar to Python
statecache.py)
- Added estimateSize() to calculate memory weight of cached values
- Default cache weight: 100MB
- Updated tests to work with weight-based eviction
* Fix prettier formatting
* Address review comments: circular references, eviction ordering, tests
- Fixed sizeof function to handle circular references using visited Set
- Fixed eviction ordering: add to cache first, then evict (fixes edge case)
- Added test for oversized item that exceeds maxCacheWeight
- Implemented custom sizeof instead of object-sizeof package (has Node.js
compatibility issues)
* Address Gemini comments: fix race condition, optimize evictIfNeeded
- Fixed critical race condition in promise callback: only update cache if
the entry is still the same promise we're resolving
- Optimized evictIfNeeded: use entries() iterator and removed redundant
checks
---
sdks/typescript/package-lock.json | 90 +++----
sdks/typescript/src/apache_beam/worker/state.ts | 135 +++++++++-
sdks/typescript/test/state_provider_test.ts | 328 ++++++++++++++++++++++++
3 files changed, 494 insertions(+), 59 deletions(-)
diff --git a/sdks/typescript/package-lock.json
b/sdks/typescript/package-lock.json
index 76f314ad490..29918b01ab8 100644
--- a/sdks/typescript/package-lock.json
+++ b/sdks/typescript/package-lock.json
@@ -49,7 +49,6 @@
"version": "0.8.0",
"resolved":
"https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz",
"integrity":
"sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg==",
- "peer": true,
"engines": {
"node": ">= 12"
}
@@ -58,7 +57,6 @@
"version": "0.7.0",
"resolved":
"https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.7.0.tgz",
"integrity":
"sha512-X4xqRHqN8ACt2aHVe51OxeA2HjbcL4MqFqXkrmQszJ1NOUuUu5u6Vqx/0lZSVNku7velL5FC/s5uEAj1lsBMhA==",
- "peer": true,
"dependencies": {
"@cspotcode/source-map-consumer": "0.8.0"
},
@@ -194,6 +192,7 @@
"version": "1.4.6",
"resolved":
"https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.4.6.tgz",
"integrity":
"sha512-Byau4xiXfIixb1PnW30V/P9mkrZ05lknyNqiK+cVY9J5hj3gecxd/anwaUbAM8j834zg1x78NvAbwGnMfWEu7A==",
+ "peer": true,
"dependencies": {
"@grpc/proto-loader": "^0.6.4",
"@types/node": ">=12.12.47"
@@ -553,26 +552,22 @@
"node_modules/@tsconfig/node10": {
"version": "1.0.8",
"resolved":
"https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.8.tgz",
- "integrity":
"sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg==",
- "peer": true
+ "integrity":
"sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg=="
},
"node_modules/@tsconfig/node12": {
"version": "1.0.9",
"resolved":
"https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.9.tgz",
- "integrity":
"sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw==",
- "peer": true
+ "integrity":
"sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw=="
},
"node_modules/@tsconfig/node14": {
"version": "1.0.1",
"resolved":
"https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.1.tgz",
- "integrity":
"sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg==",
- "peer": true
+ "integrity":
"sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg=="
},
"node_modules/@tsconfig/node16": {
"version": "1.0.2",
"resolved":
"https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.2.tgz",
- "integrity":
"sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==",
- "peer": true
+ "integrity":
"sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA=="
},
"node_modules/@types/duplexify": {
"version": "3.6.1",
@@ -602,7 +597,8 @@
"node_modules/@types/node": {
"version": "17.0.8",
"resolved": "https://registry.npmjs.org/@types/node/-/node-17.0.8.tgz",
- "integrity":
"sha512-YofkM6fGv4gDJq78g4j0mMuGMkZVxZDgtU0JRdx6FgiJDG+0fY0GKVolOV8WqVmEhLCXkQRjwDdKyPxJp/uucg=="
+ "integrity":
"sha512-YofkM6fGv4gDJq78g4j0mMuGMkZVxZDgtU0JRdx6FgiJDG+0fY0GKVolOV8WqVmEhLCXkQRjwDdKyPxJp/uucg==",
+ "peer": true
},
"node_modules/@typescript-eslint/eslint-plugin": {
"version": "5.24.0",
@@ -642,6 +638,7 @@
"resolved":
"https://registry.npmjs.org/@typescript-eslint/parser/-/parser-5.24.0.tgz",
"integrity":
"sha512-4q29C6xFYZ5B2CXqSBBdcS0lPyfM9M09DoQLtHS5kf+WbpV8pBBhHDLNhXfgyVwFnhrhYzOu7xmg02DzxeF2Uw==",
"dev": true,
+ "peer": true,
"dependencies": {
"@typescript-eslint/scope-manager": "5.24.0",
"@typescript-eslint/types": "5.24.0",
@@ -809,6 +806,7 @@
"version": "8.7.1",
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.7.1.tgz",
"integrity":
"sha512-Xx54uLJQZ19lKygFXOWsscKUbsBZW0CPykPhVQdhIeIwrbPmJzqeASDInc8nKBnp/JT6igTs82qPXz069H8I/A==",
+ "peer": true,
"bin": {
"acorn": "bin/acorn"
},
@@ -829,7 +827,6 @@
"version": "8.2.0",
"resolved":
"https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz",
"integrity":
"sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==",
- "peer": true,
"engines": {
"node": ">=0.4.0"
}
@@ -919,8 +916,7 @@
"node_modules/arg": {
"version": "4.1.3",
"resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz",
- "integrity":
"sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==",
- "peer": true
+ "integrity":
"sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA=="
},
"node_modules/argle": {
"version": "1.1.1",
@@ -1294,8 +1290,7 @@
"node_modules/create-require": {
"version": "1.1.1",
"resolved":
"https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz",
- "integrity":
"sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==",
- "peer": true
+ "integrity":
"sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ=="
},
"node_modules/cross-spawn": {
"version": "7.0.3",
@@ -1526,6 +1521,7 @@
"resolved": "https://registry.npmjs.org/eslint/-/eslint-8.15.0.tgz",
"integrity":
"sha512-GG5USZ1jhCu8HJkzGgeK8/+RGnHaNYZGrGDzUtigK3BsGESW/rs2az23XqE0WVwDxy1VRvvjSSGu5nB0Bu+6SA==",
"dev": true,
+ "peer": true,
"dependencies": {
"@eslint/eslintrc": "^1.2.3",
"@humanwhocodes/config-array": "^0.9.2",
@@ -2874,8 +2870,7 @@
"node_modules/make-error": {
"version": "1.3.6",
"resolved":
"https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz",
- "integrity":
"sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==",
- "peer": true
+ "integrity":
"sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw=="
},
"node_modules/marked": {
"version": "4.2.5",
@@ -2957,6 +2952,7 @@
"integrity":
"sha512-8uJR5RTC2NgpY3GrYcgpZrsEd9zKbPDpob1RezyR2upGHRQtHWofmzTMzTMSV6dru3tj5Ukt0+Vnq1qhFEEwAg==",
"dev": true,
"license": "MIT",
+ "peer": true,
"dependencies": {
"ansi-colors": "^4.1.3",
"browser-stdout": "^1.3.1",
@@ -3890,7 +3886,6 @@
"version": "10.7.0",
"resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.7.0.tgz",
"integrity":
"sha512-TbIGS4xgJoX2i3do417KSaep1uRAW/Lu+WAL2doDHC0D6ummjirVOXU5/7aiZotbQ5p1Zp9tP7U6cYhA0O7M8A==",
- "peer": true,
"dependencies": {
"@cspotcode/source-map-support": "0.7.0",
"@tsconfig/node10": "^1.0.7",
@@ -3933,7 +3928,6 @@
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz",
"integrity":
"sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==",
- "peer": true,
"engines": {
"node": ">=0.3.1"
}
@@ -4069,6 +4063,7 @@
"version": "4.7.4",
"resolved":
"https://registry.npmjs.org/typescript/-/typescript-4.7.4.tgz",
"integrity":
"sha512-C0WQT0gezHuw6AdY1M2jxUO83Rjf0HP7Sk1DtXj6j1EwkQNZrHAg2XPWlq62oqEhYvONq5pkC2Y9oPljWToLmQ==",
+ "peer": true,
"bin": {
"tsc": "bin/tsc",
"tsserver": "bin/tsserver"
@@ -4139,8 +4134,7 @@
"node_modules/v8-compile-cache-lib": {
"version": "3.0.0",
"resolved":
"https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.0.tgz",
- "integrity":
"sha512-mpSYqfsFvASnSn5qMiwrr4VKfumbPyONLCOPmsR3A6pTY/r0+tSaVbgPWSAIuzbk3lCTa+FForeTiO+wBQGkjA==",
- "peer": true
+ "integrity":
"sha512-mpSYqfsFvASnSn5qMiwrr4VKfumbPyONLCOPmsR3A6pTY/r0+tSaVbgPWSAIuzbk3lCTa+FForeTiO+wBQGkjA=="
},
"node_modules/vscode-oniguruma": {
"version": "1.7.0",
@@ -4302,7 +4296,6 @@
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz",
"integrity":
"sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==",
- "peer": true,
"engines": {
"node": ">=6"
}
@@ -4324,14 +4317,12 @@
"@cspotcode/source-map-consumer": {
"version": "0.8.0",
"resolved":
"https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz",
- "integrity":
"sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg==",
- "peer": true
+ "integrity":
"sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg=="
},
"@cspotcode/source-map-support": {
"version": "0.7.0",
"resolved":
"https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.7.0.tgz",
"integrity":
"sha512-X4xqRHqN8ACt2aHVe51OxeA2HjbcL4MqFqXkrmQszJ1NOUuUu5u6Vqx/0lZSVNku7velL5FC/s5uEAj1lsBMhA==",
- "peer": true,
"requires": {
"@cspotcode/source-map-consumer": "0.8.0"
}
@@ -4440,6 +4431,7 @@
"version": "1.4.6",
"resolved":
"https://registry.npmjs.org/@grpc/grpc-js/-/grpc-js-1.4.6.tgz",
"integrity":
"sha512-Byau4xiXfIixb1PnW30V/P9mkrZ05lknyNqiK+cVY9J5hj3gecxd/anwaUbAM8j834zg1x78NvAbwGnMfWEu7A==",
+ "peer": true,
"requires": {
"@grpc/proto-loader": "^0.6.4",
"@types/node": ">=12.12.47"
@@ -4707,26 +4699,22 @@
"@tsconfig/node10": {
"version": "1.0.8",
"resolved":
"https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.8.tgz",
- "integrity":
"sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg==",
- "peer": true
+ "integrity":
"sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg=="
},
"@tsconfig/node12": {
"version": "1.0.9",
"resolved":
"https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.9.tgz",
- "integrity":
"sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw==",
- "peer": true
+ "integrity":
"sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw=="
},
"@tsconfig/node14": {
"version": "1.0.1",
"resolved":
"https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.1.tgz",
- "integrity":
"sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg==",
- "peer": true
+ "integrity":
"sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg=="
},
"@tsconfig/node16": {
"version": "1.0.2",
"resolved":
"https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.2.tgz",
- "integrity":
"sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==",
- "peer": true
+ "integrity":
"sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA=="
},
"@types/duplexify": {
"version": "3.6.1",
@@ -4756,7 +4744,8 @@
"@types/node": {
"version": "17.0.8",
"resolved": "https://registry.npmjs.org/@types/node/-/node-17.0.8.tgz",
- "integrity":
"sha512-YofkM6fGv4gDJq78g4j0mMuGMkZVxZDgtU0JRdx6FgiJDG+0fY0GKVolOV8WqVmEhLCXkQRjwDdKyPxJp/uucg=="
+ "integrity":
"sha512-YofkM6fGv4gDJq78g4j0mMuGMkZVxZDgtU0JRdx6FgiJDG+0fY0GKVolOV8WqVmEhLCXkQRjwDdKyPxJp/uucg==",
+ "peer": true
},
"@typescript-eslint/eslint-plugin": {
"version": "5.24.0",
@@ -4780,6 +4769,7 @@
"resolved":
"https://registry.npmjs.org/@typescript-eslint/parser/-/parser-5.24.0.tgz",
"integrity":
"sha512-4q29C6xFYZ5B2CXqSBBdcS0lPyfM9M09DoQLtHS5kf+WbpV8pBBhHDLNhXfgyVwFnhrhYzOu7xmg02DzxeF2Uw==",
"dev": true,
+ "peer": true,
"requires": {
"@typescript-eslint/scope-manager": "5.24.0",
"@typescript-eslint/types": "5.24.0",
@@ -4870,7 +4860,8 @@
"acorn": {
"version": "8.7.1",
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.7.1.tgz",
- "integrity":
"sha512-Xx54uLJQZ19lKygFXOWsscKUbsBZW0CPykPhVQdhIeIwrbPmJzqeASDInc8nKBnp/JT6igTs82qPXz069H8I/A=="
+ "integrity":
"sha512-Xx54uLJQZ19lKygFXOWsscKUbsBZW0CPykPhVQdhIeIwrbPmJzqeASDInc8nKBnp/JT6igTs82qPXz069H8I/A==",
+ "peer": true
},
"acorn-jsx": {
"version": "5.3.2",
@@ -4882,8 +4873,7 @@
"acorn-walk": {
"version": "8.2.0",
"resolved":
"https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz",
- "integrity":
"sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==",
- "peer": true
+ "integrity":
"sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA=="
},
"agent-base": {
"version": "6.0.2",
@@ -4944,8 +4934,7 @@
"arg": {
"version": "4.1.3",
"resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz",
- "integrity":
"sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==",
- "peer": true
+ "integrity":
"sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA=="
},
"argle": {
"version": "1.1.1",
@@ -5213,8 +5202,7 @@
"create-require": {
"version": "1.1.1",
"resolved":
"https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz",
- "integrity":
"sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==",
- "peer": true
+ "integrity":
"sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ=="
},
"cross-spawn": {
"version": "7.0.3",
@@ -5376,6 +5364,7 @@
"resolved": "https://registry.npmjs.org/eslint/-/eslint-8.15.0.tgz",
"integrity":
"sha512-GG5USZ1jhCu8HJkzGgeK8/+RGnHaNYZGrGDzUtigK3BsGESW/rs2az23XqE0WVwDxy1VRvvjSSGu5nB0Bu+6SA==",
"dev": true,
+ "peer": true,
"requires": {
"@eslint/eslintrc": "^1.2.3",
"@humanwhocodes/config-array": "^0.9.2",
@@ -6402,8 +6391,7 @@
"make-error": {
"version": "1.3.6",
"resolved":
"https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz",
- "integrity":
"sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==",
- "peer": true
+ "integrity":
"sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw=="
},
"marked": {
"version": "4.2.5",
@@ -6462,6 +6450,7 @@
"resolved": "https://registry.npmjs.org/mocha/-/mocha-11.1.0.tgz",
"integrity":
"sha512-8uJR5RTC2NgpY3GrYcgpZrsEd9zKbPDpob1RezyR2upGHRQtHWofmzTMzTMSV6dru3tj5Ukt0+Vnq1qhFEEwAg==",
"dev": true,
+ "peer": true,
"requires": {
"ansi-colors": "^4.1.3",
"browser-stdout": "^1.3.1",
@@ -7113,7 +7102,6 @@
"version": "10.7.0",
"resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.7.0.tgz",
"integrity":
"sha512-TbIGS4xgJoX2i3do417KSaep1uRAW/Lu+WAL2doDHC0D6ummjirVOXU5/7aiZotbQ5p1Zp9tP7U6cYhA0O7M8A==",
- "peer": true,
"requires": {
"@cspotcode/source-map-support": "0.7.0",
"@tsconfig/node10": "^1.0.7",
@@ -7133,8 +7121,7 @@
"diff": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz",
- "integrity":
"sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==",
- "peer": true
+ "integrity":
"sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A=="
}
}
},
@@ -7228,7 +7215,8 @@
"typescript": {
"version": "4.7.4",
"resolved":
"https://registry.npmjs.org/typescript/-/typescript-4.7.4.tgz",
- "integrity":
"sha512-C0WQT0gezHuw6AdY1M2jxUO83Rjf0HP7Sk1DtXj6j1EwkQNZrHAg2XPWlq62oqEhYvONq5pkC2Y9oPljWToLmQ=="
+ "integrity":
"sha512-C0WQT0gezHuw6AdY1M2jxUO83Rjf0HP7Sk1DtXj6j1EwkQNZrHAg2XPWlq62oqEhYvONq5pkC2Y9oPljWToLmQ==",
+ "peer": true
},
"uglify-js": {
"version": "3.15.1",
@@ -7282,8 +7270,7 @@
"v8-compile-cache-lib": {
"version": "3.0.0",
"resolved":
"https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.0.tgz",
- "integrity":
"sha512-mpSYqfsFvASnSn5qMiwrr4VKfumbPyONLCOPmsR3A6pTY/r0+tSaVbgPWSAIuzbk3lCTa+FForeTiO+wBQGkjA==",
- "peer": true
+ "integrity":
"sha512-mpSYqfsFvASnSn5qMiwrr4VKfumbPyONLCOPmsR3A6pTY/r0+tSaVbgPWSAIuzbk3lCTa+FForeTiO+wBQGkjA=="
},
"vscode-oniguruma": {
"version": "1.7.0",
@@ -7408,8 +7395,7 @@
"yn": {
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz",
- "integrity":
"sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==",
- "peer": true
+ "integrity":
"sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q=="
},
"yocto-queue": {
"version": "0.1.0",
diff --git a/sdks/typescript/src/apache_beam/worker/state.ts
b/sdks/typescript/src/apache_beam/worker/state.ts
index 5a340cbb64f..5e7466a2a86 100644
--- a/sdks/typescript/src/apache_beam/worker/state.ts
+++ b/sdks/typescript/src/apache_beam/worker/state.ts
@@ -46,12 +46,110 @@ export interface StateProvider {
}
// TODO: (Advanced) Cross-bundle caching.
+/**
+ * Wrapper for cached values that tracks their weight (memory size).
+ */
+interface WeightedCacheEntry<T> {
+ entry: MaybePromise<T>;
+ weight: number;
+}
+
+// Default weight for values that cannot be sized (e.g., promises)
+const DEFAULT_WEIGHT = 64;
+
+/**
+ * Estimates the memory size of a value in bytes.
+ * Handles circular references by tracking visited objects.
+ */
+function sizeof(value: any, visited: Set<any> = new Set()): number {
+ if (value === null || value === undefined) {
+ return 8;
+ }
+
+ // Handle circular references for objects
+ if (typeof value === "object") {
+ if (visited.has(value)) {
+ return 8; // Account for reference size, not the full object again
+ }
+ visited.add(value);
+ }
+
+ const type = typeof value;
+
+ if (type === "boolean") {
+ return 4;
+ }
+ if (type === "number") {
+ return 8;
+ }
+ if (type === "string") {
+ // Each character is 2 bytes in JavaScript (UTF-16) + overhead
+ return 40 + value.length * 2;
+ }
+ if (value instanceof Uint8Array || value instanceof Buffer) {
+ return 40 + value.length;
+ }
+ if (Array.isArray(value)) {
+ let size = 40; // Array overhead
+ for (const item of value) {
+ size += sizeof(item, visited);
+ }
+ return size;
+ }
+ if (type === "object") {
+ let size = 40; // Object overhead
+ for (const key of Object.keys(value)) {
+ size += sizeof(key, visited) + sizeof(value[key], visited);
+ }
+ return size;
+ }
+
+ // Default for unknown types
+ return DEFAULT_WEIGHT;
+}
+
+// Default cache size: 100MB
+const DEFAULT_MAX_CACHE_WEIGHT = 100 * 1024 * 1024;
+
export class CachingStateProvider implements StateProvider {
underlying: StateProvider;
- cache: Map<string, MaybePromise<any>> = new Map();
+ cache: Map<string, WeightedCacheEntry<any>> = new Map();
+ maxCacheWeight: number;
+ currentWeight: number = 0;
- constructor(underlying: StateProvider) {
+ constructor(
+ underlying: StateProvider,
+ maxCacheWeight: number = DEFAULT_MAX_CACHE_WEIGHT,
+ ) {
this.underlying = underlying;
+ this.maxCacheWeight = maxCacheWeight;
+ }
+
+ /**
+ * Evicts least recently used entries until the cache is under the weight
limit.
+ * JavaScript Maps preserve insertion order, so the first entry is the
oldest.
+ */
+ private evictIfNeeded() {
+ while (this.currentWeight > this.maxCacheWeight && this.cache.size > 0) {
+ // Get the first (oldest) entry from the map iterator
+ const firstEntry = this.cache.entries().next().value;
+ const firstKey = firstEntry[0];
+ const evictedEntry = firstEntry[1];
+ this.currentWeight -= evictedEntry.weight;
+ this.cache.delete(firstKey);
+ }
+ }
+
+ /**
+ * Moves a cache entry to the end (most recently used) by deleting and
re-adding it.
+ * This maintains LRU order: most recently accessed items are at the end.
+ */
+ private touchCacheEntry(cacheKey: string) {
+ const value = this.cache.get(cacheKey);
+ if (value !== undefined) {
+ this.cache.delete(cacheKey);
+ this.cache.set(cacheKey, value);
+ }
}
getState<T>(stateKey: fnApi.StateKey, decode: (data: Uint8Array) => T) {
@@ -62,21 +160,44 @@ export class CachingStateProvider implements StateProvider
{
"base64",
);
if (this.cache.has(cacheKey)) {
- return this.cache.get(cacheKey)!;
+ // Cache hit: move to end (most recently used)
+ this.touchCacheEntry(cacheKey);
+ return this.cache.get(cacheKey)!.entry;
}
+ // Cache miss: fetch from underlying provider
let result = this.underlying.getState(stateKey, decode);
- const this_ = this;
if (result.type === "promise") {
result = {
type: "promise",
promise: result.promise.then((value) => {
- this_.cache.set(cacheKey, { type: "value", value });
+ // When promise resolves, update cache with resolved value
+ const currentEntry = this.cache.get(cacheKey);
+ // Only update if the entry in the cache is still the promise we are
resolving.
+ // This prevents a race condition where the entry is evicted and
replaced
+ // before this promise resolves.
+ if (currentEntry?.entry === result) {
+ // Remove old weight (of the promise) from total
+ this.currentWeight -= currentEntry.weight;
+
+ const resolvedWeight = sizeof(value);
+ this.cache.set(cacheKey, {
+ entry: { type: "value", value },
+ weight: resolvedWeight,
+ });
+ this.currentWeight += resolvedWeight;
+ this.evictIfNeeded();
+ }
return value;
}),
};
}
- // TODO: (Perf) Cache eviction.
- this.cache.set(cacheKey, result);
+ // Calculate weight for the new entry
+ const weight =
+ result.type === "value" ? sizeof(result.value) : DEFAULT_WEIGHT;
+ // Add new entry to cache and then evict if needed
+ this.currentWeight += weight;
+ this.cache.set(cacheKey, { entry: result, weight });
+ this.evictIfNeeded();
return result;
}
}
diff --git a/sdks/typescript/test/state_provider_test.ts
b/sdks/typescript/test/state_provider_test.ts
new file mode 100644
index 00000000000..30b71e78295
--- /dev/null
+++ b/sdks/typescript/test/state_provider_test.ts
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as assert from "assert";
+import {
+ CachingStateProvider,
+ StateProvider,
+ MaybePromise,
+} from "../src/apache_beam/worker/state";
+import * as fnApi from "../src/apache_beam/proto/beam_fn_api";
+
+/**
+ * Mock StateProvider for testing that tracks call counts.
+ */
+class MockStateProvider implements StateProvider {
+ callCount: number = 0;
+ values: Map<string, any> = new Map();
+ delayMs: number = 0;
+
+ constructor(delayMs: number = 0) {
+ this.delayMs = delayMs;
+ }
+
+ setValue(key: string, value: any) {
+ this.values.set(key, value);
+ }
+
+ getState<T>(
+ stateKey: fnApi.StateKey,
+ decode: (data: Uint8Array) => T,
+ ): MaybePromise<T> {
+ this.callCount++;
+ const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+ "base64",
+ );
+ const value = this.values.get(key);
+
+ if (this.delayMs > 0) {
+ return {
+ type: "promise",
+ promise: new Promise<T>((resolve) => {
+ setTimeout(() => resolve(value), this.delayMs);
+ }),
+ };
+ } else {
+ return { type: "value", value };
+ }
+ }
+}
+
+describe("CachingStateProvider", function () {
+ it("caches values and returns cached result on subsequent calls", function
() {
+ const mockProvider = new MockStateProvider();
+ // Use large weight limit to ensure no eviction for this test
+ const cache = new CachingStateProvider(mockProvider, 10 * 1024);
+
+ const stateKey: fnApi.StateKey = {
+ type: {
+ oneofKind: "bagUserState",
+ bagUserState: fnApi.StateKey_BagUserState.create({
+ transformId: "test",
+ userStateId: "state1",
+ window: new Uint8Array(0),
+ key: new Uint8Array(0),
+ }),
+ },
+ };
+
+ const decode = (data: Uint8Array) => data.toString();
+
+ // Set value in mock
+ const testValue = "cached_value";
+ const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+ "base64",
+ );
+ mockProvider.setValue(key, testValue);
+
+ // First call should hit underlying provider
+ const result1 = cache.getState(stateKey, decode);
+ assert.equal(mockProvider.callCount, 1);
+ assert.equal(result1.type, "value");
+ if (result1.type === "value") {
+ assert.equal(result1.value, testValue);
+ }
+
+ // Second call should use cache
+ const result2 = cache.getState(stateKey, decode);
+ assert.equal(mockProvider.callCount, 1); // Still 1, not 2
+ assert.equal(result2.type, "value");
+ if (result2.type === "value") {
+ assert.equal(result2.value, testValue);
+ }
+ });
+
+ it("evicts least recently used entry when cache weight exceeds limit",
function () {
+ const mockProvider = new MockStateProvider();
+ // Each small string "valueX" is approximately 52 bytes (40 + 6*2)
+ // Set weight limit to hold approximately 3 entries
+ const cache = new CachingStateProvider(mockProvider, 180);
+
+ const decode = (data: Uint8Array) => data.toString();
+
+ // Create 4 different state keys
+ const keys: fnApi.StateKey[] = [];
+ for (let i = 0; i < 4; i++) {
+ keys.push({
+ type: {
+ oneofKind: "bagUserState",
+ bagUserState: fnApi.StateKey_BagUserState.create({
+ transformId: "test",
+ userStateId: `state${i}`,
+ window: new Uint8Array(0),
+ key: new Uint8Array(0),
+ }),
+ },
+ });
+ }
+
+ // Set values in mock
+ for (let i = 0; i < 4; i++) {
+ const key = Buffer.from(fnApi.StateKey.toBinary(keys[i])).toString(
+ "base64",
+ );
+ mockProvider.setValue(key, `value${i}`);
+ }
+
+ // Fill cache with 3 entries
+ cache.getState(keys[0], decode);
+ cache.getState(keys[1], decode);
+ cache.getState(keys[2], decode);
+ assert.equal(mockProvider.callCount, 3);
+ assert.equal(cache.cache.size, 3);
+
+ // Access keys[0] to make it most recently used
+ cache.getState(keys[0], decode);
+ assert.equal(mockProvider.callCount, 3); // Still cached
+
+ // Add 4th entry - should evict keys[1] (least recently used, not keys[0])
+ cache.getState(keys[3], decode);
+ assert.equal(mockProvider.callCount, 4);
+
+ // keys[1] should be evicted (not in cache)
+ const result1 = cache.getState(keys[1], decode);
+ assert.equal(mockProvider.callCount, 5); // Had to fetch again
+ assert.equal(result1.type, "value");
+ if (result1.type === "value") {
+ assert.equal(result1.value, "value1");
+ }
+
+ // keys[0] should still be cached (was most recently used)
+ const result0 = cache.getState(keys[0], decode);
+ assert.equal(mockProvider.callCount, 5); // Still cached, no new call
+ assert.equal(result0.type, "value");
+ if (result0.type === "value") {
+ assert.equal(result0.value, "value0");
+ }
+ });
+
+ it("handles promise-based state fetches correctly", async function () {
+ const mockProvider = new MockStateProvider(10); // 10ms delay
+ // Use large weight limit to ensure no eviction for this test
+ const cache = new CachingStateProvider(mockProvider, 10 * 1024);
+
+ const stateKey: fnApi.StateKey = {
+ type: {
+ oneofKind: "bagUserState",
+ bagUserState: fnApi.StateKey_BagUserState.create({
+ transformId: "test",
+ userStateId: "async_state",
+ window: new Uint8Array(0),
+ key: new Uint8Array(0),
+ }),
+ },
+ };
+
+ const decode = (data: Uint8Array) => data.toString();
+ const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+ "base64",
+ );
+ mockProvider.setValue(key, "async_value");
+
+ // First call returns promise
+ const result1 = cache.getState(stateKey, decode);
+ assert.equal(result1.type, "promise");
+ assert.equal(mockProvider.callCount, 1);
+
+ // Wait for promise to resolve
+ if (result1.type === "promise") {
+ const value1 = await result1.promise;
+ assert.equal(value1, "async_value");
+
+ // Second call should return cached value (not promise)
+ const result2 = cache.getState(stateKey, decode);
+ assert.equal(result2.type, "value");
+ assert.equal(mockProvider.callCount, 1); // Still only 1 call
+ if (result2.type === "value") {
+ assert.equal(result2.value, "async_value");
+ }
+ }
+ });
+
+ it("respects custom maxCacheWeight and evicts based on memory size",
function () {
+ const mockProvider = new MockStateProvider();
+ // Set weight limit to hold approximately 2 small string entries
+ const cache = new CachingStateProvider(mockProvider, 120);
+
+ const decode = (data: Uint8Array) => data.toString();
+
+ const keys: fnApi.StateKey[] = [];
+ for (let i = 0; i < 3; i++) {
+ keys.push({
+ type: {
+ oneofKind: "bagUserState",
+ bagUserState: fnApi.StateKey_BagUserState.create({
+ transformId: "test",
+ userStateId: `state${i}`,
+ window: new Uint8Array(0),
+ key: new Uint8Array(0),
+ }),
+ },
+ });
+ const key = Buffer.from(fnApi.StateKey.toBinary(keys[i])).toString(
+ "base64",
+ );
+ mockProvider.setValue(key, `value${i}`);
+ }
+
+ // Fill cache with 2 entries
+ cache.getState(keys[0], decode);
+ cache.getState(keys[1], decode);
+ assert.equal(cache.cache.size, 2);
+
+ // Add 3rd entry - should evict oldest to stay under weight limit
+ cache.getState(keys[2], decode);
+
+ // First entry should be evicted
+ cache.getState(keys[0], decode);
+ assert.equal(mockProvider.callCount, 4); // Had to fetch keys[0] again
+ });
+
+ it("tracks cache weight correctly", function () {
+ const mockProvider = new MockStateProvider();
+ const cache = new CachingStateProvider(mockProvider, 10 * 1024);
+
+ const decode = (data: Uint8Array) => data.toString();
+
+ const stateKey: fnApi.StateKey = {
+ type: {
+ oneofKind: "bagUserState",
+ bagUserState: fnApi.StateKey_BagUserState.create({
+ transformId: "test",
+ userStateId: "state1",
+ window: new Uint8Array(0),
+ key: new Uint8Array(0),
+ }),
+ },
+ };
+
+ const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+ "base64",
+ );
+ mockProvider.setValue(key, "test_value");
+
+ // Cache should start with 0 weight
+ assert.equal(cache.currentWeight, 0);
+
+ // After adding an entry, weight should increase
+ cache.getState(stateKey, decode);
+ assert.ok(cache.currentWeight > 0);
+ });
+
+ it("evicts oversized item that exceeds maxCacheWeight", function () {
+ const mockProvider = new MockStateProvider();
+ // Set a very small weight limit (10 bytes)
+ const cache = new CachingStateProvider(mockProvider, 10);
+
+ const decode = (data: Uint8Array) => data.toString();
+
+ const stateKey: fnApi.StateKey = {
+ type: {
+ oneofKind: "bagUserState",
+ bagUserState: fnApi.StateKey_BagUserState.create({
+ transformId: "test",
+ userStateId: "oversized_state",
+ window: new Uint8Array(0),
+ key: new Uint8Array(0),
+ }),
+ },
+ };
+
+ const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+ "base64",
+ );
+ // Create a large value that exceeds the cache weight limit
+ const largeValue = "this_is_a_very_large_value_that_exceeds_the_limit";
+ mockProvider.setValue(key, largeValue);
+
+ // Cache should start empty
+ assert.equal(cache.cache.size, 0);
+ assert.equal(cache.currentWeight, 0);
+
+ // Add the oversized item - it should be added and then immediately evicted
+ cache.getState(stateKey, decode);
+
+ // The cache should be empty after eviction (item was added then evicted)
+ assert.equal(cache.cache.size, 0);
+ assert.equal(cache.currentWeight, 0);
+
+ // Fetching again should hit the underlying provider since item was evicted
+ cache.getState(stateKey, decode);
+ assert.equal(mockProvider.callCount, 2);
+ });
+});