changeset 9dfa56e85ce4 in modules/stock:default
details: https://hg.tryton.org/modules/stock?cmd=changeset&node=9dfa56e85ce4
description:
Allow grouping by date products by location
issue11639
review431411003
diffstat:
CHANGELOG | 1 +
move.py | 39 ++++++++++++++++++++--------
tests/test_module.py | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 98 insertions(+), 12 deletions(-)
diffs (192 lines):
diff -r 8e0c5d43ed6b -r 9dfa56e85ce4 CHANGELOG
--- a/CHANGELOG Thu Sep 08 13:10:15 2022 +0200
+++ b/CHANGELOG Thu Sep 08 13:12:48 2022 +0200
@@ -1,3 +1,4 @@
+* Allow grouping by date products by location
* Ignore consumable products when inactivating locations
Version 6.4.0 - 2022-05-02
diff -r 8e0c5d43ed6b -r 9dfa56e85ce4 move.py
--- a/move.py Thu Sep 08 13:10:15 2022 +0200
+++ b/move.py Thu Sep 08 13:12:48 2022 +0200
@@ -1137,8 +1137,8 @@
stock_destinations: A list of location ids. If set, restrict the
computation to moves from and to those locations.
If with_childs, it computes also for child locations.
- grouping is a tuple of Move (or Product if prefixed by 'product.')
- field names and defines how stock moves are grouped.
+ grouping is a tuple of Move (or Product if prefixed by 'product.' or
+ 'date') field names and defines how stock moves are grouped.
grouping_filter is a tuple of values, for the Move's field at the same
position in grouping tuple, used to filter which moves are used to
compute quantities. It must be None or have the same number of
@@ -1171,14 +1171,22 @@
use_product = True
else:
Model = Move
- if field not in Model._fields:
+ if field not in Model._fields and field != 'date':
raise ValueError('"%s" has no field "%s"' % (Model, field))
assert grouping_filter is None or len(grouping_filter) <= len(grouping)
assert len(set(grouping)) == len(grouping)
+ assert ('stock_date_start' in context) if 'date' in grouping else True
company = User(Transaction().user).company
- def get_column(name, table, product):
+ def get_column(name, table):
+ if name == 'date':
+ return cls.effective_date.sql_cast(
+ Coalesce(table.effective_date, table.planned_date))
+ else:
+ return Column(table, name)
+
+ def get_column_product(name, table, product):
if name.startswith('product.'):
column = Column(product, name[len('product.'):])
else:
@@ -1189,8 +1197,9 @@
product = Product.__table__()
columns = ['id', 'state', 'effective_date', 'planned_date',
'internal_quantity', 'from_location', 'to_location', 'company']
- columns += [c for c in grouping if c not in columns]
- columns = [get_column(c, move, product) for c in columns]
+ columns += [
+ c for c in grouping if c not in columns and c != 'date']
+ columns = [get_column_product(c, move, product) for c in columns]
move = (move
.join(product, condition=move.product == product.id)
.select(*columns))
@@ -1204,7 +1213,8 @@
product_cache = Product.__table__()
columns = ['internal_quantity', 'period', 'location']
columns += [c for c in grouping if c not in columns]
- columns = [get_column(c, period_cache, product_cache)
+ columns = [
+ get_column_product(c, period_cache, product_cache)
for c in columns]
period_cache = (period_cache
.join(product_cache,
@@ -1225,7 +1235,7 @@
columns = ['id', 'state', 'effective_date', 'planned_date',
'internal_quantity', 'company']
columns += [c for c in grouping if c not in columns]
- columns = [Column(move, c).as_(c) for c in columns]
+ columns = [get_column(c, move).as_(c) for c in columns]
move_with_parent = (move
.join(from_location,
@@ -1452,7 +1462,7 @@
for fieldname, grouping_ids in zip(grouping, grouping_filter):
if not grouping_ids:
continue
- column = Column(move, fieldname)
+ column = get_column(fieldname, move)
if PeriodCache:
cache_column = Column(period_cache, fieldname)
if isinstance(grouping_ids[0], (int, float, Decimal)):
@@ -1480,8 +1490,8 @@
# One that sums incoming moves towards locations, one that sums
# outgoing moves and one for the period cache. UNION ALL is used
# because we already know that there will be no duplicates.
- move_keys_alias = [Column(move, key).as_(key) for key in grouping]
- move_keys = [Column(move, key) for key in grouping]
+ move_keys_alias = [get_column(key, move).as_(key) for key in grouping]
+ move_keys = [get_column(key, move) for key in grouping]
query = move.select(move.to_location.as_('location'),
Sum(move.internal_quantity).as_('quantity'),
*move_keys_alias,
@@ -1564,7 +1574,12 @@
for line in cursor:
if len(location_ids) > 1:
location = line[0]
- key = tuple(line[1:-1])
+ key = list(line[1:-1])
+ if 'date' in grouping:
+ i = grouping.index('date')
+ if not isinstance(key[i], datetime.date):
+ key[i] = datetime.date.fromisoformat(key[i])
+ key = tuple(key)
quantity = line[-1]
quantities[(location,) + key] += quantity
ids.add(id_getter(line))
diff -r 8e0c5d43ed6b -r 9dfa56e85ce4 tests/test_module.py
--- a/tests/test_module.py Thu Sep 08 13:10:15 2022 +0200
+++ b/tests/test_module.py Thu Sep 08 13:12:48 2022 +0200
@@ -665,6 +665,76 @@
products_by_location[(storage2.id, product.id)], 60)
@with_transaction()
+ def test_products_by_location_grouped_by_date(self):
+ "Test products_by_location grouped by date"
+ pool = Pool()
+ Uom = pool.get('product.uom')
+ Template = pool.get('product.template')
+ Product = pool.get('product.product')
+ Location = pool.get('stock.location')
+ Move = pool.get('stock.move')
+ Date = pool.get('ir.date')
+
+ unit, = Uom.search([('name', '=', 'Unit')])
+ template, = Template.create([{
+ 'name': "Template",
+ 'type': 'goods',
+ 'default_uom': unit.id,
+ }])
+ product, = Product.create([{
+ 'template': template.id,
+ }])
+ lost_found, = Location.search([('type', '=', 'lost_found')])
+ storage, = Location.search([('code', '=', 'STO')])
+ output, = Location.search([('code', '=', 'OUT')])
+ company = create_company()
+ with set_company(company):
+ today = Date.today()
+
+ moves = Move.create([{
+ 'product': product.id,
+ 'uom': unit.id,
+ 'quantity': 10,
+ 'from_location': lost_found.id,
+ 'to_location': storage.id,
+ 'effective_date': today,
+ 'company': company.id,
+ }, {
+ 'product': product.id,
+ 'uom': unit.id,
+ 'quantity': 2,
+ 'from_location': storage.id,
+ 'to_location': output.id,
+ 'effective_date': today + relativedelta(days=1),
+ 'company': company.id,
+ }, {
+ 'product': product.id,
+ 'uom': unit.id,
+ 'quantity': 3,
+ 'from_location': storage.id,
+ 'to_location': output.id,
+ 'effective_date': today + relativedelta(days=5),
+ 'company': company.id,
+ }])
+ with Transaction().set_context(_skip_warnings=True):
+ Move.do(moves)
+
+ with Transaction().set_context(stock_date_start=today):
+ products_by_location = Product.products_by_location(
+ [storage.id],
+ grouping=('date', 'product'),
+ grouping_filter=(None, [product.id]))
+
+ self.assertDictEqual(products_by_location, {
+ (storage.id,
+ today, product.id): 10,
+ (storage.id,
+ today + relativedelta(days=1), product.id): -2,
+ (storage.id,
+ today + relativedelta(days=5), product.id): -3,
+ })
+
+ @with_transaction()
def test_templates_by_location(self, period_closed=False):
"Test products_by_location grouped by template"
pool = Pool()