Script 'mail_helper' called by obssrc
Hello community,
here is the log from the commit of package python-iptables for openSUSE:Factory
checked in at 2023-12-08 22:32:20
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-iptables (Old)
and /work/SRC/openSUSE:Factory/.python-iptables.new.25432 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-iptables"
Fri Dec 8 22:32:20 2023 rev:8 rq:1131979 version:1.0.1
Changes:
--------
--- /work/SRC/openSUSE:Factory/python-iptables/python-iptables.changes
2018-06-28 15:13:41.179563489 +0200
+++
/work/SRC/openSUSE:Factory/.python-iptables.new.25432/python-iptables.changes
2023-12-08 22:32:52.153800900 +0100
@@ -1,0 +2,6 @@
+Thu Dec 7 22:57:15 UTC 2023 - Dirk Müller <[email protected]>
+
+- update to 1.0.1:
+ * https://github.com/ldx/python-iptables/compare/v0.13.0...v1.0.1
+
+-------------------------------------------------------------------
Old:
----
v0.13.0.tar.gz
New:
----
v1.0.1.tar.gz
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Other differences:
------------------
++++++ python-iptables.spec ++++++
--- /var/tmp/diff_new_pack.xc1sN4/_old 2023-12-08 22:32:52.585816796 +0100
+++ /var/tmp/diff_new_pack.xc1sN4/_new 2023-12-08 22:32:52.589816943 +0100
@@ -1,7 +1,7 @@
#
# spec file for package python-iptables
#
-# Copyright (c) 2018 SUSE LINUX GmbH, Nuernberg, Germany.
+# Copyright (c) 2023 SUSE LLC
#
# All modifications and additions to the file contributed by third parties
# remain the property of their copyright owners, unless otherwise agreed
@@ -12,13 +12,13 @@
# license that conforms to the Open Source Definition (Version 1.9)
# published by the Open Source Initiative.
-# Please submit bugfixes or comments via http://bugs.opensuse.org/
+# Please submit bugfixes or comments via https://bugs.opensuse.org/
#
%{?!python_module:%define python_module() python-%{**} python3-%{**}}
Name: python-iptables
-Version: 0.13.0
+Version: 1.0.1
Release: 0
Summary: Python bindings for iptables
License: Apache-2.0
++++++ v0.13.0.tar.gz -> v1.0.1.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/python-iptables-0.13.0/.github/workflows/build.yaml
new/python-iptables-1.0.1/.github/workflows/build.yaml
--- old/python-iptables-0.13.0/.github/workflows/build.yaml 1970-01-01
01:00:00.000000000 +0100
+++ new/python-iptables-1.0.1/.github/workflows/build.yaml 2023-01-22
21:50:31.000000000 +0100
@@ -0,0 +1,84 @@
+name: Build & Test
+on:
+ push:
+ branches:
+ - main
+ tags:
+ - "v*"
+ pull_request:
+jobs:
+ build-and-test:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ python-version:
+ - "3.7"
+ - "3.8"
+ - "3.9"
+ - "3.10"
+ - "3.11"
+ steps:
+ - name: Check out code
+ uses: actions/checkout@v2
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@v2
+ with:
+ python-version: ${{ matrix.python-version }}
+ - name: Build package
+ run: |
+ python -m pip install --upgrade build twine
+ python -m build
+ twine check --strict dist/*
+ - name: Install coveralls
+ run: sudo pip install coveralls
+ - name: Run tests
+ run: sudo PATH=$PATH coverage run setup.py test
+
+ release:
+ runs-on: ubuntu-latest
+ if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v')
+ needs:
+ - build-and-test
+ steps:
+ - name: Check out code
+ uses: actions/checkout@v2
+ - name: Set up Python
+ uses: actions/setup-python@v2
+ with:
+ python-version: "3.x"
+ - name: Build package
+ run: |
+ python -m pip install --upgrade build twine
+ python -m build
+ twine check --strict dist/*
+ rm -f dist/*.whl
+ - name: Publish package
+ uses: pypa/gh-action-pypi-publish@release/v1
+ with:
+ user: __token__
+ password: ${{ secrets.PYPI_API_TOKEN }}
+ - name: Create GitHub release
+ id: create_release
+ uses: actions/create-release@v1
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ with:
+ tag_name: ${{ github.ref }}
+ release_name: ${{ github.ref }}
+ draft: false
+ prerelease: false
+ - name: Set asset name
+ run: |
+ export PKG=$(ls dist/ | grep tar)
+ set -- $PKG
+ echo "name=$1" >> $GITHUB_ENV
+ - name: Upload release asset to GitHub
+ id: upload-release-asset
+ uses: actions/upload-release-asset@v1
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ with:
+ upload_url: ${{ steps.create_release.outputs.upload_url }}
+ asset_path: dist/${{ env.name }}
+ asset_name: ${{ env.name }}
+ asset_content_type: application/zip
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/python-iptables-0.13.0/.travis.yml
new/python-iptables-1.0.1/.travis.yml
--- old/python-iptables-0.13.0/.travis.yml 2018-04-17 02:19:37.000000000
+0200
+++ new/python-iptables-1.0.1/.travis.yml 1970-01-01 01:00:00.000000000
+0100
@@ -1,13 +0,0 @@
-language: python
-python:
- - "2.6"
- - "2.7"
- - "3.4"
-install:
- - python setup.py build
- - python setup.py install
- - sudo pip install coveralls
-script:
- - sudo PATH=$PATH coverage run setup.py test
-after_success:
- coveralls
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/python-iptables-0.13.0/README.md
new/python-iptables-1.0.1/README.md
--- old/python-iptables-0.13.0/README.md 2018-04-17 02:19:37.000000000
+0200
+++ new/python-iptables-1.0.1/README.md 2023-01-22 21:50:31.000000000 +0100
@@ -135,6 +135,37 @@
Examples
========
+High level abstractions
+-----------------------
+
+``python-iptables`` implements a low-level interface that tries to closely
+match the underlying C libraries. The module ``iptc.easy`` improves the
+usability of the library by providing a rich set of high-level functions
+designed to simplify the interaction with the library, for example:
+
+ >>> import iptc
+ >>> iptc.easy.dump_table('nat', ipv6=False)
+ {'INPUT': [], 'OUTPUT': [], 'POSTROUTING': [], 'PREROUTING': []}
+ >>> iptc.easy.dump_chain('filter', 'OUTPUT', ipv6=False)
+ [{'comment': {'comment': 'DNS traffic to Google'},
+ 'counters': (1, 56),
+ 'dst': '8.8.8.8/32',
+ 'protocol': 'udp',
+ 'target': 'ACCEPT',
+ 'udp': {'dport': '53'}}]
+ >>> iptc.easy.add_chain('filter', 'TestChain')
+ True
+ >>> rule_d = {'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport':
'22'}}
+ >>> iptc.easy.insert_rule('filter', 'TestChain', rule_d)
+ >>> iptc.easy.dump_chain('filter', 'TestChain')
+ [{'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}}]
+ >>> iptc.easy.delete_chain('filter', 'TestChain', flush=True)
+
+ >>> # Example of goto rule // iptables -A FORWARD -p gre -g TestChainGoto
+ >>> iptc.easy.add_chain('filter', 'TestChainGoto')
+ >>> rule_goto_d = {'protocol': 'gre', 'target': {'goto': 'TestChainGoto'}}
+ >>> iptc.easy.insert_rule('filter', 'FORWARD', rule_goto_d)
+
Rules
-----
@@ -546,6 +577,21 @@
The drawback is that Table is a singleton, and if you disable
autocommit, it will be disabled for all instances of that Table.
+Easy rules with dictionaries
+----------------------------
+To simplify operations with ``python-iptables`` rules we have included support
to define and convert Rules object into python dictionaries.
+
+ >>> import iptc
+ >>> table = iptc.Table(iptc.Table.FILTER)
+ >>> chain = iptc.Chain(table, "INPUT")
+ >>> # Create an iptc.Rule object from dictionary
+ >>> rule_d = {'comment': {'comment': 'Match tcp.22'}, 'protocol': 'tcp',
'target': 'ACCEPT', 'tcp': {'dport': '22'}}
+ >>> rule = iptc.easy.encode_iptc_rule(rule_d)
+ >>> # Obtain a dictionary representation from the iptc.Rule
+ >>> iptc.easy.decode_iptc_rule(rule)
+ {'tcp': {'dport': '22'}, 'protocol': 'tcp', 'comment': {'comment': 'Match
tcp.22'}, 'target': 'ACCEPT'}
+
+
Known Issues
============
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/python-iptables-0.13.0/doc/examples.rst
new/python-iptables-1.0.1/doc/examples.rst
--- old/python-iptables-0.13.0/doc/examples.rst 2018-04-17 02:19:37.000000000
+0200
+++ new/python-iptables-1.0.1/doc/examples.rst 2023-01-22 21:50:31.000000000
+0100
@@ -1,6 +1,37 @@
Examples
========
+High level abstractions
+-----------------------
+
+``python-iptables`` implements a low-level interface that tries to closely
+match the underlying C libraries. The module ``iptc.easy`` improves the
+usability of the library by providing a rich set of high-level functions
+designed to simplify the interaction with the library, for example:
+
+ >>> import iptc
+ >>> iptc.easy.dump_table('nat', ipv6=False)
+ {'INPUT': [], 'OUTPUT': [], 'POSTROUTING': [], 'PREROUTING': []}
+ >>> iptc.easy.dump_chain('filter', 'OUTPUT', ipv6=False)
+ [{'comment': {'comment': 'DNS traffic to Google'},
+ 'counters': (1, 56),
+ 'dst': '8.8.8.8/32',
+ 'protocol': 'udp',
+ 'target': 'ACCEPT',
+ 'udp': {'dport': '53'}}]
+ >>> iptc.easy.add_chain('filter', 'TestChain')
+ True
+ >>> rule_d = {'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport':
'22'}}
+ >>> iptc.easy.insert_rule('filter', 'TestChain', rule_d)
+ >>> iptc.easy.dump_chain('filter', 'TestChain')
+ [{'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}}]
+ >>> iptc.easy.delete_chain('filter', 'TestChain', flush=True)
+
+ >>> # Example of goto rule // iptables -A FORWARD -p gre -g TestChainGoto
+ >>> iptc.easy.add_chain('filter', 'TestChainGoto')
+ >>> rule_goto_d = {'protocol': 'gre', 'target': {'goto': 'TestChainGoto'}}
+ >>> iptc.easy.insert_rule('filter', 'FORWARD', rule_goto_d)
+
Rules
-----
@@ -410,6 +441,21 @@
The drawback is that `Table` is a singleton, and if you disable autocommit, it
will be disabled for all instances of that `Table`.
+Easy rules with dictionaries
+----------------------------
+To simplify operations with ``python-iptables`` rules we have included support
to define and convert Rules object into python dictionaries.
+
+ >>> import iptc
+ >>> table = iptc.Table(iptc.Table.FILTER)
+ >>> chain = iptc.Chain(table, "INPUT")
+ >>> # Create an iptc.Rule object from dictionary
+ >>> rule_d = {'comment': {'comment': 'Match tcp.22'}, 'protocol': 'tcp',
'target': 'ACCEPT', 'tcp': {'dport': '22'}}
+ >>> rule = iptc.easy.encode_iptc_rule(rule_d)
+ >>> # Obtain a dictionary representation from the iptc.Rule
+ >>> iptc.easy.decode_iptc_rule(rule)
+ {'tcp': {'dport': '22'}, 'protocol': 'tcp', 'comment': {'comment': 'Match
tcp.22'}, 'target': 'ACCEPT'}
+
+
Known Issues
============
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/python-iptables-0.13.0/iptc/__init__.py
new/python-iptables-1.0.1/iptc/__init__.py
--- old/python-iptables-0.13.0/iptc/__init__.py 2018-04-17 02:19:37.000000000
+0200
+++ new/python-iptables-1.0.1/iptc/__init__.py 2023-01-22 21:50:31.000000000
+0100
@@ -10,6 +10,7 @@
from iptc.ip4tc import (is_table_available, Table, Chain, Rule, Match, Target,
Policy, IPTCError)
from iptc.ip6tc import is_table6_available, Table6, Rule6
from iptc.errors import *
+import iptc.easy
__all__ = []
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/python-iptables-0.13.0/iptc/easy.py
new/python-iptables-1.0.1/iptc/easy.py
--- old/python-iptables-0.13.0/iptc/easy.py 1970-01-01 01:00:00.000000000
+0100
+++ new/python-iptables-1.0.1/iptc/easy.py 2023-01-22 21:50:31.000000000
+0100
@@ -0,0 +1,461 @@
+# -*- coding: utf-8 -*-
+
+# TODO:
+# - Add documentation
+# - Add HowToUse examples
+
+from .ip4tc import Rule, Table, Chain, IPTCError
+from .ip6tc import Rule6, Table6
+
+_BATCH_MODE = False
+
+def flush_all(ipv6=False):
+ """ Flush all available tables """
+ for table in get_tables(ipv6):
+ flush_table(table, ipv6)
+
+def flush_table(table, ipv6=False, raise_exc=True):
+ """ Flush a table """
+ try:
+ iptc_table = _iptc_gettable(table, ipv6)
+ iptc_table.flush()
+ except Exception as e:
+ if raise_exc: raise
+
+def flush_chain(table, chain, ipv6=False, raise_exc=True):
+ """ Flush a chain in table """
+ try:
+ iptc_chain = _iptc_getchain(table, chain, ipv6)
+ iptc_chain.flush()
+ except Exception as e:
+ if raise_exc: raise
+
+def zero_all(table, ipv6=False):
+ """ Zero all tables """
+ for table in get_tables(ipv6):
+ zero_table(table, ipv6)
+
+def zero_table(table, ipv6=False):
+ """ Zero a table """
+ iptc_table = _iptc_gettable(table, ipv6)
+ iptc_table.zero_entries()
+
+def zero_chain(table, chain, ipv6=False):
+ """ Zero a chain in table """
+ iptc_chain = _iptc_getchain(table, chain, ipv6)
+ iptc_chain.zero_counters()
+
+def has_chain(table, chain, ipv6=False):
+ """ Return True if chain exists in table False otherwise """
+ return _iptc_gettable(table, ipv6).is_chain(chain)
+
+def has_rule(table, chain, rule_d, ipv6=False):
+ """ Return True if rule exists in chain False otherwise """
+ iptc_chain = _iptc_getchain(table, chain, ipv6)
+ iptc_rule = encode_iptc_rule(rule_d, ipv6)
+ return iptc_rule in iptc_chain.rules
+
+def add_chain(table, chain, ipv6=False, raise_exc=True):
+ """ Return True if chain was added successfully to a table, raise
Exception otherwise """
+ try:
+ iptc_table = _iptc_gettable(table, ipv6)
+ iptc_table.create_chain(chain)
+ return True
+ except Exception as e:
+ if raise_exc: raise
+ return False
+
+def add_rule(table, chain, rule_d, position=0, ipv6=False):
+ """ Add a rule to a chain in a given position. 0=append, 1=first, n=nth
position """
+ iptc_chain = _iptc_getchain(table, chain, ipv6)
+ iptc_rule = encode_iptc_rule(rule_d, ipv6)
+ if position == 0:
+ # Insert rule in last position -> append
+ iptc_chain.append_rule(iptc_rule)
+ elif position > 0:
+ # Insert rule in given position -> adjusted as iptables CLI
+ iptc_chain.insert_rule(iptc_rule, position - 1)
+ elif position < 0:
+ # Insert rule in given position starting from bottom -> not available
in iptables CLI
+ nof_rules = len(iptc_chain.rules)
+ _position = position + nof_rules
+ # Insert at the top if the position has looped over
+ if _position <= 0:
+ _position = 0
+ iptc_chain.insert_rule(iptc_rule, _position)
+
+def insert_rule(table, chain, rule_d, ipv6=False):
+ """ Add a rule to a chain in the 1st position """
+ add_rule(table, chain, rule_d, position=1, ipv6=ipv6)
+
+def delete_chain(table, chain, ipv6=False, flush=False, raise_exc=True):
+ """ Delete a chain """
+ try:
+ if flush:
+ flush_chain(table, chain, ipv6, raise_exc)
+ iptc_table = _iptc_gettable(table, ipv6)
+ iptc_table.delete_chain(chain)
+ except Exception as e:
+ if raise_exc: raise
+
+def delete_rule(table, chain, rule_d, ipv6=False, raise_exc=True):
+ """ Delete a rule from a chain """
+ try:
+ iptc_chain = _iptc_getchain(table, chain, ipv6)
+ iptc_rule = encode_iptc_rule(rule_d, ipv6)
+ iptc_chain.delete_rule(iptc_rule)
+ except Exception as e:
+ if raise_exc: raise
+
+def get_tables(ipv6=False):
+ """ Get all tables """
+ iptc_tables = _iptc_gettables(ipv6)
+ return [t.name for t in iptc_tables]
+
+def get_chains(table, ipv6=False):
+ """ Return the existing chains of a table """
+ iptc_table = _iptc_gettable(table, ipv6)
+ return [iptc_chain.name for iptc_chain in iptc_table.chains]
+
+def get_rule(table, chain, position=0, ipv6=False, raise_exc=True):
+ """ Get a rule from a chain in a given position. 0=all rules, 1=first,
n=nth position """
+ try:
+ if position == 0:
+ # Return all rules
+ return dump_chain(table, chain, ipv6)
+ elif position > 0:
+ # Return specific rule by position
+ iptc_chain = _iptc_getchain(table, chain, ipv6)
+ iptc_rule = iptc_chain.rules[position - 1]
+ return decode_iptc_rule(iptc_rule, ipv6)
+ elif position < 0:
+ # Return last rule -> not available in iptables CLI
+ iptc_chain = _iptc_getchain(table, chain, ipv6)
+ iptc_rule = iptc_chain.rules[position]
+ return decode_iptc_rule(iptc_rule, ipv6)
+ except Exception as e:
+ if raise_exc: raise
+
+def replace_rule(table, chain, old_rule_d, new_rule_d, ipv6=False):
+ """ Replaces an existing rule of a chain """
+ iptc_chain = _iptc_getchain(table, chain, ipv6)
+ iptc_old_rule = encode_iptc_rule(old_rule_d, ipv6)
+ iptc_new_rule = encode_iptc_rule(new_rule_d, ipv6)
+ iptc_chain.replace_rule(iptc_new_rule,
iptc_chain.rules.index(iptc_old_rule))
+
+def get_rule_counters(table, chain, rule_d, ipv6=False):
+ """ Return a tuple with the rule counters (numberOfBytes, numberOfPackets)
"""
+ if not has_rule(table, chain, rule_d, ipv6):
+ raise AttributeError('Chain <{}@{}> has no rule <{}>'.format(chain,
table, rule_d))
+ iptc_chain = _iptc_getchain(table, chain, ipv6)
+ iptc_rule = encode_iptc_rule(rule_d, ipv6)
+ iptc_rule_index = iptc_chain.rules.index(iptc_rule)
+ return iptc_chain.rules[iptc_rule_index].get_counters()
+
+def get_rule_position(table, chain, rule_d, ipv6=False):
+ """ Return the position of a rule within a chain """
+ if not has_rule(table, chain, rule_d):
+ raise AttributeError('Chain <{}@{}> has no rule <{}>'.format(chain,
table, rule_d))
+ iptc_chain = _iptc_getchain(table, chain, ipv6)
+ iptc_rule = encode_iptc_rule(rule_d, ipv6)
+ return iptc_chain.rules.index(iptc_rule)
+
+
+def test_rule(rule_d, ipv6=False):
+ """ Return True if the rule is a well-formed dictionary, False otherwise
"""
+ try:
+ encode_iptc_rule(rule_d, ipv6)
+ return True
+ except:
+ return False
+
+def test_match(name, value, ipv6=False):
+ """ Return True if the match is valid, False otherwise """
+ try:
+ iptc_rule = Rule6() if ipv6 else Rule()
+ _iptc_setmatch(iptc_rule, name, value)
+ return True
+ except:
+ return False
+
+def test_target(name, value, ipv6=False):
+ """ Return True if the target is valid, False otherwise """
+ try:
+ iptc_rule = Rule6() if ipv6 else Rule()
+ _iptc_settarget(iptc_rule, {name:value})
+ return True
+ except:
+ return False
+
+
+def get_policy(table, chain, ipv6=False):
+ """ Return the default policy of chain in a table """
+ iptc_chain = _iptc_getchain(table, chain, ipv6)
+ return iptc_chain.get_policy().name
+
+def set_policy(table, chain, policy='ACCEPT', ipv6=False):
+ """ Set the default policy of chain in a table """
+ iptc_chain = _iptc_getchain(table, chain, ipv6)
+ iptc_chain.set_policy(policy)
+
+
+def dump_all(ipv6=False):
+ """ Return a dictionary representation of all tables """
+ return {table: dump_table(table, ipv6) for table in get_tables(ipv6)}
+
+def dump_table(table, ipv6=False):
+ """ Return a dictionary representation of a table """
+ return {chain: dump_chain(table, chain, ipv6) for chain in
get_chains(table, ipv6)}
+
+def dump_chain(table, chain, ipv6=False):
+ """ Return a list with the dictionary representation of the rules of a
table """
+ iptc_chain = _iptc_getchain(table, chain, ipv6)
+ return [decode_iptc_rule(iptc_rule, ipv6) for iptc_rule in
iptc_chain.rules]
+
+
+def batch_begin(table = None, ipv6=False):
+ """ Disable autocommit on a table """
+ _BATCH_MODE = True
+ if table:
+ tables = (table, )
+ else:
+ tables = get_tables(ipv6)
+ for table in tables:
+ iptc_table = _iptc_gettable(table, ipv6)
+ iptc_table.autocommit = False
+
+def batch_end(table = None, ipv6=False):
+ """ Enable autocommit on table and commit changes """
+ _BATCH_MODE = False
+ if table:
+ tables = (table, )
+ else:
+ tables = get_tables(ipv6)
+ for table in tables:
+ iptc_table = _iptc_gettable(table, ipv6)
+ iptc_table.autocommit = True
+
+def batch_add_chains(table, chains, ipv6=False, flush=True):
+ """ Add multiple chains to a table """
+ iptc_table = _batch_begin_table(table, ipv6)
+ for chain in chains:
+ if iptc_table.is_chain(chain):
+ iptc_chain = Chain(iptc_table, chain)
+ else:
+ iptc_chain = iptc_table.create_chain(chain)
+ if flush:
+ iptc_chain.flush()
+ _batch_end_table(table, ipv6)
+
+def batch_delete_chains(table, chains, ipv6=False):
+ """ Delete multiple chains of a table """
+ iptc_table = _batch_begin_table(table, ipv6)
+ for chain in chains:
+ if iptc_table.is_chain(chain):
+ iptc_chain = Chain(iptc_table, chain)
+ iptc_chain.flush()
+ iptc_table.delete_chain(chain)
+ _batch_end_table(table, ipv6)
+
+def batch_add_rules(table, batch_rules, ipv6=False):
+ """ Add multiple rules to a table with format (chain, rule_d, position) """
+ iptc_table = _batch_begin_table(table, ipv6)
+ for (chain, rule_d, position) in batch_rules:
+ iptc_chain = Chain(iptc_table, chain)
+ iptc_rule = encode_iptc_rule(rule_d, ipv6)
+ if position == 0:
+ # Insert rule in last position -> append
+ iptc_chain.append_rule(iptc_rule)
+ elif position > 0:
+ # Insert rule in given position -> adjusted as iptables CLI
+ iptc_chain.insert_rule(iptc_rule, position-1)
+ elif position < 0:
+ # Insert rule in given position starting from bottom -> not
available in iptables CLI
+ nof_rules = len(iptc_chain.rules)
+ iptc_chain.insert_rule(iptc_rule, position + nof_rules)
+ _batch_end_table(table, ipv6)
+
+def batch_delete_rules(table, batch_rules, ipv6=False, raise_exc=True):
+ """ Delete multiple rules from table with format (chain, rule_d) """
+ try:
+ iptc_table = _batch_begin_table(table, ipv6)
+ for (chain, rule_d) in batch_rules:
+ iptc_chain = Chain(iptc_table, chain)
+ iptc_rule = encode_iptc_rule(rule_d, ipv6)
+ iptc_chain.delete_rule(iptc_rule)
+ _batch_end_table(table, ipv6)
+ except Exception as e:
+ if raise_exc: raise
+
+
+def encode_iptc_rule(rule_d, ipv6=False):
+ """ Return a Rule(6) object from the input dictionary """
+ # Sanity check
+ assert(isinstance(rule_d, dict))
+ # Basic rule attributes
+ rule_attr = ('src', 'dst', 'protocol', 'in-interface', 'out-interface',
'fragment')
+ iptc_rule = Rule6() if ipv6 else Rule()
+ # Set default target
+ rule_d.setdefault('target', '')
+ # Avoid issues with matches that require basic parameters to be configured
first
+ for name in rule_attr:
+ if name in rule_d:
+ setattr(iptc_rule, name.replace('-', '_'), rule_d[name])
+ for name, value in rule_d.items():
+ try:
+ if name in rule_attr:
+ continue
+ elif name == 'counters':
+ _iptc_setcounters(iptc_rule, value)
+ elif name == 'target':
+ _iptc_settarget(iptc_rule, value)
+ else:
+ _iptc_setmatch(iptc_rule, name, value)
+ except Exception as e:
+ #print('Ignoring unsupported field <{}:{}>'.format(name, value))
+ continue
+ return iptc_rule
+
+def decode_iptc_rule(iptc_rule, ipv6=False):
+ """ Return a dictionary representation of the Rule(6) object
+ Note: host IP addresses are appended their corresponding CIDR """
+ d = {}
+ if ipv6==False and iptc_rule.src != '0.0.0.0/0.0.0.0':
+ _ip, _netmask = iptc_rule.src.split('/')
+ _netmask = _netmask_v4_to_cidr(_netmask)
+ d['src'] = '{}/{}'.format(_ip, _netmask)
+ elif ipv6==True and iptc_rule.src != '::/0':
+ d['src'] = iptc_rule.src
+ if ipv6==False and iptc_rule.dst != '0.0.0.0/0.0.0.0':
+ _ip, _netmask = iptc_rule.dst.split('/')
+ _netmask = _netmask_v4_to_cidr(_netmask)
+ d['dst'] = '{}/{}'.format(_ip, _netmask)
+ elif ipv6==True and iptc_rule.dst != '::/0':
+ d['dst'] = iptc_rule.dst
+ if iptc_rule.protocol != 'ip':
+ d['protocol'] = iptc_rule.protocol
+ if iptc_rule.in_interface is not None:
+ d['in-interface'] = iptc_rule.in_interface
+ if iptc_rule.out_interface is not None:
+ d['out-interface'] = iptc_rule.out_interface
+ if ipv6 == False and iptc_rule.fragment:
+ d['fragment'] = iptc_rule.fragment
+ for m in iptc_rule.matches:
+ if m.name not in d:
+ d[m.name] = m.get_all_parameters()
+ elif isinstance(d[m.name], list):
+ d[m.name].append(m.get_all_parameters())
+ else:
+ d[m.name] = [d[m.name], m.get_all_parameters()]
+ if iptc_rule.target and iptc_rule.target.name and
len(iptc_rule.target.get_all_parameters()):
+ name = iptc_rule.target.name.replace('-', '_')
+ d['target'] = {name:iptc_rule.target.get_all_parameters()}
+ elif iptc_rule.target and iptc_rule.target.name:
+ if iptc_rule.target.goto:
+ d['target'] = {'goto':iptc_rule.target.name}
+ else:
+ d['target'] = iptc_rule.target.name
+ # Get counters
+ d['counters'] = iptc_rule.counters
+ # Return a filtered dictionary
+ return _filter_empty_field(d)
+
+### INTERNAL FUNCTIONS ###
+def _iptc_table_available(table, ipv6=False):
+ """ Return True if the table is available, False otherwise """
+ try:
+ iptc_table = Table6(table) if ipv6 else Table(table)
+ return True
+ except:
+ return False
+
+def _iptc_gettables(ipv6=False):
+ """ Return an updated view of all available iptc_table """
+ iptc_cls = Table6 if ipv6 else Table
+ return [_iptc_gettable(t, ipv6) for t in iptc_cls.ALL if
_iptc_table_available(t, ipv6)]
+
+def _iptc_gettable(table, ipv6=False):
+ """ Return an updated view of an iptc_table """
+ iptc_table = Table6(table) if ipv6 else Table(table)
+ if _BATCH_MODE is False:
+ iptc_table.commit()
+ iptc_table.refresh()
+ return iptc_table
+
+def _iptc_getchain(table, chain, ipv6=False, raise_exc=True):
+ """ Return an iptc_chain of an updated table """
+ try:
+ iptc_table = _iptc_gettable(table, ipv6)
+ if not iptc_table.is_chain(chain):
+ raise AttributeError('Table <{}> has no chain <{}>'.format(table,
chain))
+ return Chain(iptc_table, chain)
+ except Exception as e:
+ if raise_exc: raise
+
+def _iptc_setcounters(iptc_rule, value):
+ # Value is a tuple (numberOfBytes, numberOfPackets)
+ iptc_rule.counters = value
+
+def _iptc_setmatch(iptc_rule, name, value):
+ # Iterate list/tuple recursively
+ if isinstance(value, list) or isinstance(value, tuple):
+ for inner_value in value:
+ _iptc_setmatch(iptc_rule, name, inner_value)
+ # Assign dictionary value
+ elif isinstance(value, dict):
+ iptc_match = iptc_rule.create_match(name)
+ [iptc_match.set_parameter(k, v) for k, v in value.items()]
+ # Assign value directly
+ else:
+ iptc_match = iptc_rule.create_match(name)
+ iptc_match.set_parameter(name, value)
+
+def _iptc_settarget(iptc_rule, value):
+ # Target is dictionary - Use only 1st pair key/value
+ if isinstance(value, dict):
+ t_name, t_value = next(iter(value.items()))
+ if t_name == 'goto':
+ iptc_target = iptc_rule.create_target(t_value, goto=True)
+ else:
+ iptc_target = iptc_rule.create_target(t_name)
+ [iptc_target.set_parameter(k, v) for k, v in t_value.items()]
+ # Simple target
+ else:
+ iptc_target = iptc_rule.create_target(value)
+
+def _batch_begin_table(table, ipv6=False):
+ """ Disable autocommit on a table """
+ iptc_table = _iptc_gettable(table, ipv6)
+ iptc_table.autocommit = False
+ return iptc_table
+
+def _batch_end_table(table, ipv6=False):
+ """ Enable autocommit on table and commit changes """
+ iptc_table = _iptc_gettable(table, ipv6)
+ iptc_table.autocommit = True
+ return iptc_table
+
+def _filter_empty_field(data_d):
+ """
+ Remove empty lists from dictionary values
+ Before: {'target': {'CHECKSUM': {'checksum-fill': []}}}
+ After: {'target': {'CHECKSUM': {'checksum-fill': ''}}}
+ Before: {'tcp': {'dport': ['22']}}}
+ After: {'tcp': {'dport': '22'}}}
+ """
+ for k, v in data_d.items():
+ if isinstance(v, dict):
+ data_d[k] = _filter_empty_field(v)
+ elif isinstance(v, list) and len(v) != 0:
+ v = [_filter_empty_field(_v) if isinstance(_v, dict) else _v for
_v in v ]
+ if isinstance(v, list) and len(v) == 1:
+ data_d[k] = v.pop()
+ elif isinstance(v, list) and len(v) == 0:
+ data_d[k] = ''
+ return data_d
+
+def _netmask_v4_to_cidr(netmask_addr):
+ # Implement Subnet Mask conversion without dependencies
+ return sum([bin(int(x)).count('1') for x in netmask_addr.split('.')])
+
+### /INTERNAL FUNCTIONS ###
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/python-iptables-0.13.0/iptc/ip4tc.py
new/python-iptables-1.0.1/iptc/ip4tc.py
--- old/python-iptables-0.13.0/iptc/ip4tc.py 2018-04-17 02:19:37.000000000
+0200
+++ new/python-iptables-1.0.1/iptc/ip4tc.py 2023-01-22 21:50:31.000000000
+0100
@@ -9,7 +9,7 @@
import struct
import weakref
-from .util import find_library, load_kernel
+from .util import find_library, load_kernel, find_libc
from .xtables import (XT_INV_PROTO, NFPROTO_IPV4, XTablesError, xtables,
xt_align, xt_counters, xt_entry_target, xt_entry_match)
@@ -26,7 +26,7 @@
_IFNAMSIZ = 16
-_libc = ct.CDLL("libc.so.6")
+_libc = find_libc()
_get_errno_loc = _libc.__errno_location
_get_errno_loc.restype = ct.POINTER(ct.c_int)
_malloc = _libc.malloc
@@ -419,6 +419,7 @@
res = shlex.split(buf)
res.reverse()
inv = False
+ key = None
while len(res) > 0:
x = res.pop()
if x == '!':
@@ -433,7 +434,11 @@
params[key] = []
inv = False
continue
- params[key].append(x) # This is a parameter value.
+ # At this point key should be set, unless the output from save is
+ # not formatted right. Let's be defensive, since some users
+ # reported that problem.
+ if key is not None:
+ params[key].append(x) # This is a parameter value.
return params
def _update_parameters(self):
@@ -574,9 +579,7 @@
def __eq__(self, match):
basesz = ct.sizeof(xt_entry_match)
- if (self.match.u.match_size == match.match.u.match_size and
- self.match.u.user.name == match.match.u.user.name and
- self.match.u.user.revision == match.match.u.user.revision and
+ if (self.name == match.name and
self.match_buf[basesz:self.usersize] ==
match.match_buf[basesz:match.usersize]):
return True
@@ -1064,9 +1067,6 @@
saddr = _a_to_i(socket.inet_pton(socket.AF_INET, addr))
except socket.error:
raise ValueError("invalid address %s" % (addr))
- ina = in_addr()
- ina.s_addr = ct.c_uint32(saddr)
- self.entry.ip.src = ina
if not netm.isdigit():
try:
@@ -1080,8 +1080,11 @@
nmask = socket.htonl((2 ** imask - 1) << (32 - imask))
neta = in_addr()
neta.s_addr = ct.c_uint32(nmask)
-
self.entry.ip.smsk = neta
+ # Apply subnet mask to IP address
+ ina = in_addr()
+ ina.s_addr = ct.c_uint32(saddr & nmask)
+ self.entry.ip.src = ina
src = property(get_src, set_src)
"""This is the source network address with an optional network mask in
@@ -1125,9 +1128,6 @@
daddr = _a_to_i(socket.inet_pton(socket.AF_INET, addr))
except socket.error:
raise ValueError("invalid address %s" % (addr))
- ina = in_addr()
- ina.s_addr = ct.c_uint32(daddr)
- self.entry.ip.dst = ina
if not netm.isdigit():
try:
@@ -1142,6 +1142,10 @@
neta = in_addr()
neta.s_addr = ct.c_uint32(nmask)
self.entry.ip.dmsk = neta
+ # Apply subnet mask to IP address
+ ina = in_addr()
+ ina.s_addr = ct.c_uint32(daddr & nmask)
+ self.entry.ip.dst = ina
dst = property(get_dst, set_dst)
"""This is the destination network address with an optional network mask
@@ -1282,6 +1286,15 @@
counters = self.entry.counters
return counters.pcnt, counters.bcnt
+ def set_counters(self, counters):
+ """This method set a tuple pair of the packet and byte counters of
+ the rule."""
+ self.entry.counters.pcnt = counters[0]
+ self.entry.counters.bcnt = counters[1]
+
+ counters = property(get_counters, set_counters)
+ """This is the packet and byte counters of the rule."""
+
# override the following three for the IPv6 subclass
def _entry_size(self):
return xt_align(ct.sizeof(ipt_entry))
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/python-iptables-0.13.0/iptc/util.py
new/python-iptables-1.0.1/iptc/util.py
--- old/python-iptables-0.13.0/iptc/util.py 2018-04-17 02:19:37.000000000
+0200
+++ new/python-iptables-1.0.1/iptc/util.py 2023-01-22 21:50:31.000000000
+0100
@@ -80,12 +80,19 @@
def _find_library(*names):
+ exts = []
if version_info >= (3, 3):
- ext = get_config_var("EXT_SUFFIX")
+ exts.append(get_config_var("EXT_SUFFIX"))
else:
- ext = get_config_var('SO')
+ exts.append(get_config_var('SO'))
+
+ if version_info >= (3, 5):
+ exts.append('.so')
+
for name in names:
- libnames = [name, "lib" + name, name + ext, "lib" + name + ext]
+ libnames = [name, "lib" + name]
+ for ext in exts:
+ libnames += [name + ext, "lib" + name + ext]
libdir = os.environ.get('IPTABLES_LIBDIR', None)
if libdir is not None:
libdirs = libdir.split(':')
@@ -109,3 +116,19 @@
major = int(m.group(1))
return lib, major
return None, None
+
+
+def find_libc():
+ lib = ctypes.util.find_library('c')
+ if lib is not None:
+ return ctypes.CDLL(lib, mode=ctypes.RTLD_GLOBAL)
+
+ libnames = ['libc.so.6', 'libc.so.0', 'libc.so']
+ for name in libnames:
+ try:
+ lib = ctypes.CDLL(name, mode=ctypes.RTLD_GLOBAL)
+ return lib
+ except:
+ pass
+
+ return None
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/python-iptables-0.13.0/iptc/version.py
new/python-iptables-1.0.1/iptc/version.py
--- old/python-iptables-0.13.0/iptc/version.py 2018-04-17 02:19:37.000000000
+0200
+++ new/python-iptables-1.0.1/iptc/version.py 2023-01-22 21:50:31.000000000
+0100
@@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
__pkgname__ = "python-iptables"
-__version__ = "0.13.0"
+__version__ = "1.0.1"
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/python-iptables-0.13.0/iptc/xtables.py
new/python-iptables-1.0.1/iptc/xtables.py
--- old/python-iptables-0.13.0/iptc/xtables.py 2018-04-17 02:19:37.000000000
+0200
+++ new/python-iptables-1.0.1/iptc/xtables.py 2023-01-22 21:50:31.000000000
+0100
@@ -6,7 +6,7 @@
import weakref
from . import version
-from .util import find_library
+from .util import find_library, find_libc
from .errors import *
XT_INV_PROTO = 0x40 # invert the sense of PROTO
@@ -792,7 +792,7 @@
("v12", _xtables_target_v12)]
-_libc, _ = find_library("c")
+_libc = find_libc()
_optind = ct.c_long.in_dll(_libc, "optind")
_optarg = ct.c_char_p.in_dll(_libc, "optarg")
@@ -805,10 +805,10 @@
_xtables_libdir = os.getenv("XTABLES_LIBDIR")
if _xtables_libdir is None:
import re
- ldconfig_path_regex = re.compile('^(/.*):$')
+ ldconfig_path_regex = re.compile('^(/.*):($| \(.*\)$)')
import subprocess
ldconfig = subprocess.Popen(
- ('ldconfig', '-N', '-v'),
+ ('/sbin/ldconfig', '-N', '-v'),
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True
)
ldconfig_out, ldconfig_err = ldconfig.communicate()
@@ -1020,10 +1020,6 @@
if ext is not None:
return ext
- xtables._xtables_matches.value = ct.c_void_p(None).value
- if xtables._xtables_pending_matches:
- xtables._xtables_pending_matches.value = ct.c_void_p(None).value
-
match = xtables._xtables_find_match(name, XTF_TRY_LOAD, None)
if not match:
self._try_register(name)
@@ -1045,10 +1041,6 @@
if ext is not None:
return ext
- xtables._xtables_targets.value = ct.c_void_p(None).value
- if xtables._xtables_pending_targets:
- xtables._xtables_pending_targets.value = ct.c_void_p(None).value
-
target = xtables._xtables_find_target(name, XTF_TRY_LOAD)
if not target:
self._try_register(name)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/python-iptables-0.13.0/setup.py
new/python-iptables-1.0.1/setup.py
--- old/python-iptables-0.13.0/setup.py 2018-04-17 02:19:37.000000000 +0200
+++ new/python-iptables-1.0.1/setup.py 2023-01-22 21:50:31.000000000 +0100
@@ -15,6 +15,8 @@
name=__pkgname__,
version=__version__,
description="Python bindings for iptables",
+ long_description="Python bindings for classic iptables",
+ long_description_content_type="text/x-rst",
author="Vilmos Nebehaj",
author_email="[email protected]",
url="https://github.com/ldx/python-iptables",
@@ -38,10 +40,7 @@
"Topic :: System :: Networking :: Firewalls",
"Topic :: System :: Systems Administration",
"Programming Language :: Python :: 2",
- "Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
- "Programming Language :: Python :: 3.3",
- "Programming Language :: Python :: 3.4",
"Programming Language :: Python :: Implementation :: CPython",
],
license="Apache License, Version 2.0",
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/python-iptables-0.13.0/tests/test_iptc.py
new/python-iptables-1.0.1/tests/test_iptc.py
--- old/python-iptables-0.13.0/tests/test_iptc.py 2018-04-17
02:19:37.000000000 +0200
+++ new/python-iptables-1.0.1/tests/test_iptc.py 2023-01-22
21:50:31.000000000 +0100
@@ -574,6 +574,25 @@
self.failUnless(rule in crules)
crules.remove(rule)
+ def test_rule_to_dict(self):
+ rule = iptc.Rule6()
+ rule.protocol = "tcp"
+ rule.src = "::1/128"
+ target = iptc.Target(rule, "ACCEPT")
+ rule.target = target
+ rule_d = iptc.easy.decode_iptc_rule(rule, ipv6=True)
+ # Remove counters when comparing rules
+ rule_d.pop('counters', None)
+ self.assertEqual(rule_d, {"protocol": "tcp", "src": "::1/128",
"target": "ACCEPT"})
+
+ def test_rule_from_dict(self):
+ rule = iptc.Rule6()
+ rule.protocol = "tcp"
+ rule.src = "::1/128"
+ target = iptc.Target(rule, "ACCEPT")
+ rule.target = target
+ rule2 = iptc.easy.encode_iptc_rule({"protocol": "tcp", "src":
"::1/128", "target": "ACCEPT"}, ipv6=True)
+ self.assertEqual(rule, rule2)
class TestRule(unittest.TestCase):
def setUp(self):
@@ -603,13 +622,13 @@
def test_rule_address(self):
# valid addresses
rule = iptc.Rule()
- for addr in [("127.0.0.1/255.255.255.0", "127.0.0.1/255.255.255.0"),
- ("!127.0.0.1/255.255.255.0", "!127.0.0.1/255.255.255.0"),
- ("127.0.0.1/255.255.128.0", "127.0.0.1/255.255.128.0"),
- ("127.0.0.1/16", "127.0.0.1/255.255.0.0"),
- ("127.0.0.1/24", "127.0.0.1/255.255.255.0"),
- ("127.0.0.1/17", "127.0.0.1/255.255.128.0"),
- ("!127.0.0.1/17", "!127.0.0.1/255.255.128.0")]:
+ for addr in [("127.0.0.1/255.255.255.0", "127.0.0.0/255.255.255.0"),
+ ("!127.0.0.1/255.255.255.0", "!127.0.0.0/255.255.255.0"),
+ ("127.0.0.1/255.255.128.0", "127.0.0.0/255.255.128.0"),
+ ("127.0.0.1/16", "127.0.0.0/255.255.0.0"),
+ ("127.0.0.1/24", "127.0.0.0/255.255.255.0"),
+ ("127.0.0.1/17", "127.0.0.0/255.255.128.0"),
+ ("!127.0.0.1/17", "!127.0.0.0/255.255.128.0")]:
rule.src = addr[0]
self.assertEquals(rule.src, addr[1])
rule.dst = addr[0]
@@ -915,6 +934,25 @@
self.table_nat.commit()
self.table_nat.refresh()
+ def test_rule_to_dict(self):
+ rule = iptc.Rule()
+ rule.protocol = "tcp"
+ rule.src = "127.0.0.1/32"
+ target = iptc.Target(rule, "ACCEPT")
+ rule.target = target
+ rule_d = iptc.easy.decode_iptc_rule(rule)
+ # Remove counters when comparing rules
+ rule_d.pop('counters', None)
+ self.assertEqual(rule_d, {"protocol": "tcp", "src": "127.0.0.1/32",
"target": "ACCEPT"})
+
+ def test_rule_from_dict(self):
+ rule = iptc.Rule()
+ rule.protocol = "tcp"
+ rule.src = "127.0.0.1/32"
+ target = iptc.Target(rule, "ACCEPT")
+ rule.target = target
+ rule2 = iptc.easy.encode_iptc_rule({"protocol": "tcp", "src":
"127.0.0.1/32", "target": "ACCEPT"})
+ self.assertEqual(rule, rule2)
def suite():
suite_table6 = unittest.TestLoader().loadTestsFromTestCase(TestTable6)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/python-iptables-0.13.0/tests/test_matches.py
new/python-iptables-1.0.1/tests/test_matches.py
--- old/python-iptables-0.13.0/tests/test_matches.py 2018-04-17
02:19:37.000000000 +0200
+++ new/python-iptables-1.0.1/tests/test_matches.py 2023-01-22
21:50:31.000000000 +0100
@@ -19,7 +19,7 @@
match = rule.create_match("udp")
for m in rule.matches:
- self.failUnless(m == match)
+ self.assertEqual(m, match)
# check that we can change match parameters after creation
match.sport = "12345:55555"
@@ -29,7 +29,7 @@
m.sport = "12345:55555"
m.dport = "!33333"
- self.failUnless(m == match)
+ self.assertEqual(m, match)
def test_match_compare(self):
m1 = iptc.Match(iptc.Rule(), "udp")
@@ -40,28 +40,28 @@
m2.sport = "12345:55555"
m2.dport = "!33333"
- self.failUnless(m1 == m2)
+ self.assertEqual(m1, m2)
m2.reset()
m2.sport = "12345:55555"
m2.dport = "33333"
- self.failIf(m1 == m2)
+ self.assertNotEqual(m1, m2)
def test_match_parameters(self):
m = iptc.Match(iptc.Rule(), "udp")
m.sport = "12345:55555"
m.dport = "!33333"
- self.failUnless(len(m.parameters) == 2)
+ self.assertEqual(len(m.parameters), 2)
for p in m.parameters:
- self.failUnless(p == "sport" or p == "dport")
+ self.assertTrue(p == "sport" or p == "dport")
- self.failUnless(m.parameters["sport"] == "12345:55555")
- self.failUnless(m.parameters["dport"] == "!33333")
+ self.assertEqual(m.parameters["sport"], "12345:55555")
+ self.assertEqual(m.parameters["dport"], "!33333")
m.reset()
- self.failUnless(len(m.parameters) == 0)
+ self.assertEqual(len(m.parameters), 0)
def test_get_all_parameters(self):
m = iptc.Match(iptc.Rule(), "udp")
@@ -69,8 +69,8 @@
m.dport = "!33333"
params = m.get_all_parameters()
- self.assertEquals(set(params['sport']), set(['12345:55555']))
- self.assertEquals(set(params['dport']), set(['!', '33333']))
+ self.assertEqual(set(params['sport']), set(['12345:55555']))
+ self.assertEqual(set(params['dport']), set(['!', '33333']))
class TestMultiportMatch(unittest.TestCase):
@@ -103,14 +103,14 @@
self.chain.insert_rule(self.rule)
rule = self.chain.rules[0]
match = rule.matches[0]
- self.assertEquals(match.dports, '1111,2222')
+ self.assertEqual(match.dports, '1111,2222')
def test_unicode_multiport(self):
self.match.dports = u'1111,2222'
self.chain.insert_rule(self.rule)
rule = self.chain.rules[0]
match = rule.matches[0]
- self.assertEquals(match.dports, '1111,2222')
+ self.assertEqual(match.dports, '1111,2222')
class TestXTUdpMatch(unittest.TestCase):
@@ -135,9 +135,9 @@
"!12345:12346", "0:1234", "! 1234", "!0:12345",
"!1234:65535"]:
self.match.sport = port
- self.assertEquals(self.match.sport, port.replace(" ", ""))
+ self.assertEqual(self.match.sport, port.replace(" ", ""))
self.match.dport = port
- self.assertEquals(self.match.dport, port.replace(" ", ""))
+ self.assertEqual(self.match.dport, port.replace(" ", ""))
self.match.reset()
for port in ["-1", "asdf", "!asdf"]:
try:
@@ -188,7 +188,7 @@
def test_mark(self):
for mark in ["0x7b", "! 0x7b", "0x7b/0xfffefffe", "!0x7b/0xff00ff00"]:
self.match.mark = mark
- self.assertEquals(self.match.mark, mark.replace(" ", ""))
+ self.assertEqual(self.match.mark, mark.replace(" ", ""))
self.match.reset()
for mark in ["0xffffffffff", "123/0xffffffff1", "!asdf", "1234:1233"]:
try:
@@ -232,7 +232,7 @@
def test_limit(self):
for limit in ["1/sec", "5/min", "3/hour"]:
self.match.limit = limit
- self.assertEquals(self.match.limit, limit)
+ self.assertEqual(self.match.limit, limit)
self.match.reset()
for limit in ["asdf", "123/1", "!1", "!1/second"]:
try:
@@ -284,7 +284,7 @@
def test_icmpv6(self):
self.chain.insert_rule(self.rule)
rule = self.chain.rules[0]
- self.assertEquals(self.rule, rule)
+ self.assertEqual(self.rule, rule)
class TestCommentMatch(unittest.TestCase):
@@ -310,7 +310,7 @@
self.match.reset()
self.match.comment = comment
self.chain.insert_rule(self.rule)
- self.assertEquals(self.match.comment, comment)
+ self.assertEqual(self.match.comment, comment)
class TestIprangeMatch(unittest.TestCase):
@@ -389,8 +389,10 @@
self.chain.insert_rule(self.rule)
rule = self.chain.rules[0]
m = rule.matches[0]
- self.assertEquals(m.name, "state")
- self.assertEquals(m.state, "RELATED,ESTABLISHED")
+ self.assertEqual(m.name, "state")
+ self.assertEqual(m.state, "RELATED,ESTABLISHED")
+ self.assertEqual(rule.matches[0].name, self.rule.matches[0].name)
+ self.assertEqual(rule, self.rule)
class TestXTConntrackMatch(unittest.TestCase):
@@ -425,7 +427,7 @@
rule = self.chain.rules[0]
m = rule.matches[0]
self.assertTrue(m.name, ["conntrack"])
- self.assertEquals(m.ctstate, "NEW,RELATED")
+ self.assertEqual(m.ctstate, "NEW,RELATED")
class TestHashlimitMatch(unittest.TestCase):
@@ -464,11 +466,42 @@
rule = self.chain.rules[0]
m = rule.matches[0]
self.assertTrue(m.name, ["hashlimit"])
- self.assertEquals(m.hashlimit_name, "foo")
- self.assertEquals(m.hashlimit_mode, "srcip")
- self.assertEquals(m.hashlimit_upto, "200/sec")
- self.assertEquals(m.hashlimit_burst, "5")
+ self.assertEqual(m.hashlimit_name, "foo")
+ self.assertEqual(m.hashlimit_mode, "srcip")
+ self.assertEqual(m.hashlimit_upto, "200/sec")
+ self.assertEqual(m.hashlimit_burst, "5")
+class TestRecentMatch(unittest.TestCase):
+ def setUp(self):
+ self.table = 'filter'
+ self.chain = 'iptc_test_recent'
+ iptc.easy.delete_chain(self.table, self.chain, ipv6=False, flush=True,
raise_exc=False)
+ iptc.easy.add_chain(self.table, self.chain, ipv6=False, raise_exc=True)
+
+ def tearDown(self):
+ iptc.easy.delete_chain(self.table, self.chain, ipv6=False, flush=True,
raise_exc=False)
+
+ def test_recent(self):
+ rule_d = {
+ 'protocol': 'udp',
+ 'recent': {
+ 'mask': '255.255.255.255',
+ 'update': '',
+ 'seconds': '60',
+ 'rsource': '',
+ 'name': 'UDP-PORTSCAN',
+ },
+ 'target': {
+ 'REJECT':{
+ 'reject-with': 'icmp-port-unreachable'
+ }
+ }
+ }
+ iptc.easy.add_rule(self.table, self.chain, rule_d)
+ rule2_d = iptc.easy.get_rule(self.table, self.chain, -1)
+ # Remove counters when comparing rules
+ rule2_d.pop('counters', None)
+ self.assertEqual(rule_d, rule2_d)
def suite():
suite_match = unittest.TestLoader().loadTestsFromTestCase(TestMatch)
@@ -486,6 +519,8 @@
TestXTConntrackMatch)
suite_hashlimit = unittest.TestLoader().loadTestsFromTestCase(
TestHashlimitMatch)
+ suite_recent = unittest.TestLoader().loadTestsFromTestCase(
+ TestRecentMatch)
extra_suites = []
if is_table6_available(iptc.Table6.FILTER):
extra_suites += unittest.TestLoader().loadTestsFromTestCase(
@@ -494,7 +529,7 @@
return unittest.TestSuite([suite_match, suite_udp, suite_mark,
suite_limit, suite_mport, suite_comment,
suite_iprange, suite_state, suite_conntrack,
- suite_hashlimit] + extra_suites)
+ suite_hashlimit, suite_recent] + extra_suites)
def run_tests():