Script 'mail_helper' called by obssrc
Hello community,
here is the log from the commit of package python-kafka-python for
openSUSE:Factory checked in at 2022-09-07 11:06:14
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-kafka-python (Old)
and /work/SRC/openSUSE:Factory/.python-kafka-python.new.2083 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-kafka-python"
Wed Sep 7 11:06:14 2022 rev:9 rq:1001511 version:2.0.2
Changes:
--------
--- /work/SRC/openSUSE:Factory/python-kafka-python/python-kafka-python.changes
2020-11-10 13:53:32.538839903 +0100
+++
/work/SRC/openSUSE:Factory/.python-kafka-python.new.2083/python-kafka-python.changes
2022-09-07 11:06:34.372488308 +0200
@@ -1,0 +2,7 @@
+Tue Sep 6 14:12:44 UTC 2022 - John Vandenberg <[email protected]>
+
+- Add test files missing from PyPI release:
+ conftest.py, fixtures.py, service.py & zookeeper.properties
+- Activate test suite
+
+-------------------------------------------------------------------
New:
----
conftest.py
fixtures.py
service.py
zookeeper.properties
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Other differences:
------------------
++++++ python-kafka-python.spec ++++++
--- /var/tmp/diff_new_pack.BIwP3G/_old 2022-09-07 11:06:35.052490035 +0200
+++ /var/tmp/diff_new_pack.BIwP3G/_new 2022-09-07 11:06:35.056490046 +0200
@@ -1,7 +1,7 @@
#
# spec file for package python-kafka-python
#
-# Copyright (c) 2020 SUSE LLC
+# Copyright (c) 2022 SUSE LLC
#
# All modifications and additions to the file contributed by third parties
# remain the property of their copyright owners, unless otherwise agreed
@@ -25,12 +25,27 @@
Group: Development/Languages/Python
URL: https://github.com/mumrah/kafka-python
Source:
https://files.pythonhosted.org/packages/source/k/kafka-python/kafka-python-%{version}.tar.gz
+Source1:
https://raw.githubusercontent.com/dpkp/kafka-python/master/servers/0.11.0.3/resources/zookeeper.properties
+Source2:
https://raw.githubusercontent.com/dpkp/kafka-python/master/test/conftest.py
+Source3:
https://raw.githubusercontent.com/dpkp/kafka-python/master/test/fixtures.py
+Source4:
https://raw.githubusercontent.com/dpkp/kafka-python/master/test/service.py
BuildRequires: %{python_module setuptools}
-BuildRequires: %{python_module six}
BuildRequires: fdupes
BuildRequires: python-rpm-macros
-Requires: python-six
+# Recommends: python-crc32c # Not packaged
+Recommends: python-zstandard
+Suggests: python-lz4
+Suggests: python-xxhash
BuildArch: noarch
+# SECTION test requirements
+BuildRequires: %{python_module lz4}
+BuildRequires: %{python_module mock}
+BuildRequires: %{python_module pytest-mock}
+BuildRequires: %{python_module pytest}
+BuildRequires: %{python_module python-snappy}
+BuildRequires: %{python_module xxhash}
+BuildRequires: %{python_module zstandard}
+# /SECTION
%python_subpackages
%description
@@ -41,6 +56,12 @@
%prep
%setup -q -n kafka-python-%{version}
+mkdir -p servers/0.11.0.2/resources/
+cp %{SOURCE1} servers/0.11.0.2/resources/
+
+cp %{SOURCE2} %{SOURCE3} %{SOURCE4} test/
+
+touch test/__init__.py
%build
%python_build
@@ -49,10 +70,13 @@
%python_install
%python_expand %fdupes %{buildroot}%{$python_sitelib}
+%check
+# test_kafka_producer_gc_cleanup is sometimes off by 1
+%pytest -rs -k 'not (test_kafka_consumer_offsets_for_time_old or
test_kafka_producer_gc_cleanup)'
+
%files %{python_files}
%license LICENSE
%doc README.rst
-%{python_sitelib}/*
-%exclude %{python_sitelib}/tests/
+%{python_sitelib}/kafka*/
%changelog
++++++ conftest.py ++++++
from __future__ import absolute_import
import uuid
import pytest
from test.testutil import env_kafka_version, random_string
from test.fixtures import KafkaFixture, ZookeeperFixture
@pytest.fixture(scope="module")
def zookeeper():
"""Return a Zookeeper fixture"""
zk_instance = ZookeeperFixture.instance()
yield zk_instance
zk_instance.close()
@pytest.fixture(scope="module")
def kafka_broker(kafka_broker_factory):
"""Return a Kafka broker fixture"""
return kafka_broker_factory()[0]
@pytest.fixture(scope="module")
def kafka_broker_factory(zookeeper):
"""Return a Kafka broker fixture factory"""
assert env_kafka_version(), 'KAFKA_VERSION must be specified to run
integration tests'
_brokers = []
def factory(**broker_params):
params = {} if broker_params is None else broker_params.copy()
params.setdefault('partitions', 4)
num_brokers = params.pop('num_brokers', 1)
brokers = tuple(KafkaFixture.instance(x, zookeeper, **params)
for x in range(num_brokers))
_brokers.extend(brokers)
return brokers
yield factory
for broker in _brokers:
broker.close()
@pytest.fixture
def kafka_client(kafka_broker, request):
"""Return a KafkaClient fixture"""
(client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' %
(request.node.name,))
yield client
client.close()
@pytest.fixture
def kafka_consumer(kafka_consumer_factory):
"""Return a KafkaConsumer fixture"""
return kafka_consumer_factory()
@pytest.fixture
def kafka_consumer_factory(kafka_broker, topic, request):
"""Return a KafkaConsumer factory fixture"""
_consumer = [None]
def factory(**kafka_consumer_params):
params = {} if kafka_consumer_params is None else
kafka_consumer_params.copy()
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
params.setdefault('auto_offset_reset', 'earliest')
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic],
**params))
return _consumer[0]
yield factory
if _consumer[0]:
_consumer[0].close()
@pytest.fixture
def kafka_producer(kafka_producer_factory):
"""Return a KafkaProducer fixture"""
yield kafka_producer_factory()
@pytest.fixture
def kafka_producer_factory(kafka_broker, request):
"""Return a KafkaProduce factory fixture"""
_producer = [None]
def factory(**kafka_producer_params):
params = {} if kafka_producer_params is None else
kafka_producer_params.copy()
params.setdefault('client_id', 'producer_%s' % (request.node.name,))
_producer[0] = next(kafka_broker.get_producers(cnt=1, **params))
return _producer[0]
yield factory
if _producer[0]:
_producer[0].close()
@pytest.fixture
def kafka_admin_client(kafka_admin_client_factory):
"""Return a KafkaAdminClient fixture"""
yield kafka_admin_client_factory()
@pytest.fixture
def kafka_admin_client_factory(kafka_broker):
"""Return a KafkaAdminClient factory fixture"""
_admin_client = [None]
def factory(**kafka_admin_client_params):
params = {} if kafka_admin_client_params is None else
kafka_admin_client_params.copy()
_admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params))
return _admin_client[0]
yield factory
if _admin_client[0]:
_admin_client[0].close()
@pytest.fixture
def topic(kafka_broker, request):
"""Return a topic fixture"""
topic_name = '%s_%s' % (request.node.name, random_string(10))
kafka_broker.create_topics([topic_name])
return topic_name
@pytest.fixture
def conn(mocker):
"""Return a connection mocker fixture"""
from kafka.conn import ConnectionStates
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse
conn = mocker.patch('kafka.client_async.BrokerConnection')
conn.return_value = conn
conn.state = ConnectionStates.CONNECTED
conn.send.return_value = Future().success(
MetadataResponse[0](
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
[])) # topics
conn.blacked_out.return_value = False
def _set_conn_state(state):
conn.state = state
return state
conn._set_conn_state = _set_conn_state
conn.connect.side_effect = lambda: conn.state
conn.connect_blocking.return_value = True
conn.connecting = lambda: conn.state in (ConnectionStates.CONNECTING,
ConnectionStates.HANDSHAKE)
conn.connected = lambda: conn.state is ConnectionStates.CONNECTED
conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED
return conn
@pytest.fixture()
def send_messages(topic, kafka_producer, request):
"""A factory that returns a send_messages function with a pre-populated
topic topic / producer."""
def _send_messages(number_range, partition=0, topic=topic,
producer=kafka_producer, request=request):
"""
messages is typically `range(0,100)`
partition is an int
"""
messages_and_futures = [] # [(message, produce_future),]
for i in number_range:
# request.node.name provides the test name (including parametrized
values)
encoded_msg = '{}-{}-{}'.format(i, request.node.name,
uuid.uuid4()).encode('utf-8')
future = kafka_producer.send(topic, value=encoded_msg,
partition=partition)
messages_and_futures.append((encoded_msg, future))
kafka_producer.flush()
for (msg, f) in messages_and_futures:
assert f.succeeded()
return [msg for (msg, f) in messages_and_futures]
return _send_messages
++++++ fixtures.py ++++++
++++ 674 lines (skipped)
++++++ service.py ++++++
from __future__ import absolute_import
import logging
import os
import re
import select
import subprocess
import sys
import threading
import time
__all__ = [
'ExternalService',
'SpawnedService',
]
log = logging.getLogger(__name__)
class ExternalService(object):
def __init__(self, host, port):
log.info("Using already running service at %s:%d", host, port)
self.host = host
self.port = port
def open(self):
pass
def close(self):
pass
class SpawnedService(threading.Thread):
def __init__(self, args=None, env=None):
super(SpawnedService, self).__init__()
if args is None:
raise TypeError("args parameter is required")
self.args = args
self.env = env
self.captured_stdout = []
self.captured_stderr = []
self.should_die = threading.Event()
self.child = None
self.alive = False
self.daemon = True
log.info("Created service for command:")
log.info(" "+' '.join(self.args))
log.debug("With environment:")
for key, value in self.env.items():
log.debug(" {key}={value}".format(key=key, value=value))
def _spawn(self):
if self.alive: return
if self.child and self.child.poll() is None: return
self.child = subprocess.Popen(
self.args,
preexec_fn=os.setsid, # to avoid propagating signals
env=self.env,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.alive = self.child.poll() is None
def _despawn(self):
if self.child.poll() is None:
self.child.terminate()
self.alive = False
for _ in range(50):
if self.child.poll() is not None:
self.child = None
break
time.sleep(0.1)
else:
self.child.kill()
def run(self):
self._spawn()
while True:
try:
(rds, _, _) = select.select([self.child.stdout,
self.child.stderr], [], [], 1)
except select.error as ex:
if ex.args[0] == 4:
continue
else:
raise
if self.child.stdout in rds:
line = self.child.stdout.readline().decode('utf-8').rstrip()
if line:
self.captured_stdout.append(line)
if self.child.stderr in rds:
line = self.child.stderr.readline().decode('utf-8').rstrip()
if line:
self.captured_stderr.append(line)
if self.child.poll() is not None:
self.dump_logs()
break
if self.should_die.is_set():
self._despawn()
break
def dump_logs(self):
sys.stderr.write('\n'.join(self.captured_stderr))
sys.stdout.write('\n'.join(self.captured_stdout))
def wait_for(self, pattern, timeout=30):
start = time.time()
while True:
if not self.is_alive():
raise RuntimeError("Child thread died already.")
elapsed = time.time() - start
if elapsed >= timeout:
log.error("Waiting for %r timed out after %d seconds", pattern,
timeout)
return False
if re.search(pattern, '\n'.join(self.captured_stdout),
re.IGNORECASE) is not None:
log.info("Found pattern %r in %d seconds via stdout", pattern,
elapsed)
return True
if re.search(pattern, '\n'.join(self.captured_stderr),
re.IGNORECASE) is not None:
log.info("Found pattern %r in %d seconds via stderr", pattern,
elapsed)
return True
time.sleep(0.1)
def stop(self):
self.should_die.set()
self.join()
++++++ zookeeper.properties ++++++
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir={tmp_dir}
# the port at which the clients will connect
clientPort={port}
clientPortAddress={host}
# disable the per-ip limit on the number of connections since this is a
non-production config
maxClientCnxns=0