Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package python-kafka-python for openSUSE:Factory checked in at 2022-09-07 11:06:14 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-kafka-python (Old) and /work/SRC/openSUSE:Factory/.python-kafka-python.new.2083 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-kafka-python" Wed Sep 7 11:06:14 2022 rev:9 rq:1001511 version:2.0.2 Changes: -------- --- /work/SRC/openSUSE:Factory/python-kafka-python/python-kafka-python.changes 2020-11-10 13:53:32.538839903 +0100 +++ /work/SRC/openSUSE:Factory/.python-kafka-python.new.2083/python-kafka-python.changes 2022-09-07 11:06:34.372488308 +0200 @@ -1,0 +2,7 @@ +Tue Sep 6 14:12:44 UTC 2022 - John Vandenberg <jay...@gmail.com> + +- Add test files missing from PyPI release: + conftest.py, fixtures.py, service.py & zookeeper.properties +- Activate test suite + +------------------------------------------------------------------- New: ---- conftest.py fixtures.py service.py zookeeper.properties ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-kafka-python.spec ++++++ --- /var/tmp/diff_new_pack.BIwP3G/_old 2022-09-07 11:06:35.052490035 +0200 +++ /var/tmp/diff_new_pack.BIwP3G/_new 2022-09-07 11:06:35.056490046 +0200 @@ -1,7 +1,7 @@ # # spec file for package python-kafka-python # -# Copyright (c) 2020 SUSE LLC +# Copyright (c) 2022 SUSE LLC # # All modifications and additions to the file contributed by third parties # remain the property of their copyright owners, unless otherwise agreed @@ -25,12 +25,27 @@ Group: Development/Languages/Python URL: https://github.com/mumrah/kafka-python Source: https://files.pythonhosted.org/packages/source/k/kafka-python/kafka-python-%{version}.tar.gz +Source1: https://raw.githubusercontent.com/dpkp/kafka-python/master/servers/0.11.0.3/resources/zookeeper.properties +Source2: https://raw.githubusercontent.com/dpkp/kafka-python/master/test/conftest.py +Source3: https://raw.githubusercontent.com/dpkp/kafka-python/master/test/fixtures.py +Source4: https://raw.githubusercontent.com/dpkp/kafka-python/master/test/service.py BuildRequires: %{python_module setuptools} -BuildRequires: %{python_module six} BuildRequires: fdupes BuildRequires: python-rpm-macros -Requires: python-six +# Recommends: python-crc32c # Not packaged +Recommends: python-zstandard +Suggests: python-lz4 +Suggests: python-xxhash BuildArch: noarch +# SECTION test requirements +BuildRequires: %{python_module lz4} +BuildRequires: %{python_module mock} +BuildRequires: %{python_module pytest-mock} +BuildRequires: %{python_module pytest} +BuildRequires: %{python_module python-snappy} +BuildRequires: %{python_module xxhash} +BuildRequires: %{python_module zstandard} +# /SECTION %python_subpackages %description @@ -41,6 +56,12 @@ %prep %setup -q -n kafka-python-%{version} +mkdir -p servers/0.11.0.2/resources/ +cp %{SOURCE1} servers/0.11.0.2/resources/ + +cp %{SOURCE2} %{SOURCE3} %{SOURCE4} test/ + +touch test/__init__.py %build %python_build @@ -49,10 +70,13 @@ %python_install %python_expand %fdupes %{buildroot}%{$python_sitelib} +%check +# test_kafka_producer_gc_cleanup is sometimes off by 1 +%pytest -rs -k 'not (test_kafka_consumer_offsets_for_time_old or test_kafka_producer_gc_cleanup)' + %files %{python_files} %license LICENSE %doc README.rst -%{python_sitelib}/* -%exclude %{python_sitelib}/tests/ +%{python_sitelib}/kafka*/ %changelog ++++++ conftest.py ++++++ from __future__ import absolute_import import uuid import pytest from test.testutil import env_kafka_version, random_string from test.fixtures import KafkaFixture, ZookeeperFixture @pytest.fixture(scope="module") def zookeeper(): """Return a Zookeeper fixture""" zk_instance = ZookeeperFixture.instance() yield zk_instance zk_instance.close() @pytest.fixture(scope="module") def kafka_broker(kafka_broker_factory): """Return a Kafka broker fixture""" return kafka_broker_factory()[0] @pytest.fixture(scope="module") def kafka_broker_factory(zookeeper): """Return a Kafka broker fixture factory""" assert env_kafka_version(), 'KAFKA_VERSION must be specified to run integration tests' _brokers = [] def factory(**broker_params): params = {} if broker_params is None else broker_params.copy() params.setdefault('partitions', 4) num_brokers = params.pop('num_brokers', 1) brokers = tuple(KafkaFixture.instance(x, zookeeper, **params) for x in range(num_brokers)) _brokers.extend(brokers) return brokers yield factory for broker in _brokers: broker.close() @pytest.fixture def kafka_client(kafka_broker, request): """Return a KafkaClient fixture""" (client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,)) yield client client.close() @pytest.fixture def kafka_consumer(kafka_consumer_factory): """Return a KafkaConsumer fixture""" return kafka_consumer_factory() @pytest.fixture def kafka_consumer_factory(kafka_broker, topic, request): """Return a KafkaConsumer factory fixture""" _consumer = [None] def factory(**kafka_consumer_params): params = {} if kafka_consumer_params is None else kafka_consumer_params.copy() params.setdefault('client_id', 'consumer_%s' % (request.node.name,)) params.setdefault('auto_offset_reset', 'earliest') _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params)) return _consumer[0] yield factory if _consumer[0]: _consumer[0].close() @pytest.fixture def kafka_producer(kafka_producer_factory): """Return a KafkaProducer fixture""" yield kafka_producer_factory() @pytest.fixture def kafka_producer_factory(kafka_broker, request): """Return a KafkaProduce factory fixture""" _producer = [None] def factory(**kafka_producer_params): params = {} if kafka_producer_params is None else kafka_producer_params.copy() params.setdefault('client_id', 'producer_%s' % (request.node.name,)) _producer[0] = next(kafka_broker.get_producers(cnt=1, **params)) return _producer[0] yield factory if _producer[0]: _producer[0].close() @pytest.fixture def kafka_admin_client(kafka_admin_client_factory): """Return a KafkaAdminClient fixture""" yield kafka_admin_client_factory() @pytest.fixture def kafka_admin_client_factory(kafka_broker): """Return a KafkaAdminClient factory fixture""" _admin_client = [None] def factory(**kafka_admin_client_params): params = {} if kafka_admin_client_params is None else kafka_admin_client_params.copy() _admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params)) return _admin_client[0] yield factory if _admin_client[0]: _admin_client[0].close() @pytest.fixture def topic(kafka_broker, request): """Return a topic fixture""" topic_name = '%s_%s' % (request.node.name, random_string(10)) kafka_broker.create_topics([topic_name]) return topic_name @pytest.fixture def conn(mocker): """Return a connection mocker fixture""" from kafka.conn import ConnectionStates from kafka.future import Future from kafka.protocol.metadata import MetadataResponse conn = mocker.patch('kafka.client_async.BrokerConnection') conn.return_value = conn conn.state = ConnectionStates.CONNECTED conn.send.return_value = Future().success( MetadataResponse[0]( [(0, 'foo', 12), (1, 'bar', 34)], # brokers [])) # topics conn.blacked_out.return_value = False def _set_conn_state(state): conn.state = state return state conn._set_conn_state = _set_conn_state conn.connect.side_effect = lambda: conn.state conn.connect_blocking.return_value = True conn.connecting = lambda: conn.state in (ConnectionStates.CONNECTING, ConnectionStates.HANDSHAKE) conn.connected = lambda: conn.state is ConnectionStates.CONNECTED conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED return conn @pytest.fixture() def send_messages(topic, kafka_producer, request): """A factory that returns a send_messages function with a pre-populated topic topic / producer.""" def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request): """ messages is typically `range(0,100)` partition is an int """ messages_and_futures = [] # [(message, produce_future),] for i in number_range: # request.node.name provides the test name (including parametrized values) encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8') future = kafka_producer.send(topic, value=encoded_msg, partition=partition) messages_and_futures.append((encoded_msg, future)) kafka_producer.flush() for (msg, f) in messages_and_futures: assert f.succeeded() return [msg for (msg, f) in messages_and_futures] return _send_messages ++++++ fixtures.py ++++++ ++++ 674 lines (skipped) ++++++ service.py ++++++ from __future__ import absolute_import import logging import os import re import select import subprocess import sys import threading import time __all__ = [ 'ExternalService', 'SpawnedService', ] log = logging.getLogger(__name__) class ExternalService(object): def __init__(self, host, port): log.info("Using already running service at %s:%d", host, port) self.host = host self.port = port def open(self): pass def close(self): pass class SpawnedService(threading.Thread): def __init__(self, args=None, env=None): super(SpawnedService, self).__init__() if args is None: raise TypeError("args parameter is required") self.args = args self.env = env self.captured_stdout = [] self.captured_stderr = [] self.should_die = threading.Event() self.child = None self.alive = False self.daemon = True log.info("Created service for command:") log.info(" "+' '.join(self.args)) log.debug("With environment:") for key, value in self.env.items(): log.debug(" {key}={value}".format(key=key, value=value)) def _spawn(self): if self.alive: return if self.child and self.child.poll() is None: return self.child = subprocess.Popen( self.args, preexec_fn=os.setsid, # to avoid propagating signals env=self.env, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.alive = self.child.poll() is None def _despawn(self): if self.child.poll() is None: self.child.terminate() self.alive = False for _ in range(50): if self.child.poll() is not None: self.child = None break time.sleep(0.1) else: self.child.kill() def run(self): self._spawn() while True: try: (rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1) except select.error as ex: if ex.args[0] == 4: continue else: raise if self.child.stdout in rds: line = self.child.stdout.readline().decode('utf-8').rstrip() if line: self.captured_stdout.append(line) if self.child.stderr in rds: line = self.child.stderr.readline().decode('utf-8').rstrip() if line: self.captured_stderr.append(line) if self.child.poll() is not None: self.dump_logs() break if self.should_die.is_set(): self._despawn() break def dump_logs(self): sys.stderr.write('\n'.join(self.captured_stderr)) sys.stdout.write('\n'.join(self.captured_stdout)) def wait_for(self, pattern, timeout=30): start = time.time() while True: if not self.is_alive(): raise RuntimeError("Child thread died already.") elapsed = time.time() - start if elapsed >= timeout: log.error("Waiting for %r timed out after %d seconds", pattern, timeout) return False if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None: log.info("Found pattern %r in %d seconds via stdout", pattern, elapsed) return True if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None: log.info("Found pattern %r in %d seconds via stderr", pattern, elapsed) return True time.sleep(0.1) def stop(self): self.should_die.set() self.join() ++++++ zookeeper.properties ++++++ # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # the directory where the snapshot is stored. dataDir={tmp_dir} # the port at which the clients will connect clientPort={port} clientPortAddress={host} # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0