Script 'mail_helper' called by obssrc
Hello community,

here is the log from the commit of package python-kafka-python for 
openSUSE:Factory checked in at 2022-09-07 11:06:14
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-kafka-python (Old)
 and      /work/SRC/openSUSE:Factory/.python-kafka-python.new.2083 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-kafka-python"

Wed Sep  7 11:06:14 2022 rev:9 rq:1001511 version:2.0.2

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-kafka-python/python-kafka-python.changes  
2020-11-10 13:53:32.538839903 +0100
+++ 
/work/SRC/openSUSE:Factory/.python-kafka-python.new.2083/python-kafka-python.changes
        2022-09-07 11:06:34.372488308 +0200
@@ -1,0 +2,7 @@
+Tue Sep  6 14:12:44 UTC 2022 - John Vandenberg <jay...@gmail.com>
+
+- Add test files missing from PyPI release:
+  conftest.py, fixtures.py, service.py & zookeeper.properties
+- Activate test suite
+
+-------------------------------------------------------------------

New:
----
  conftest.py
  fixtures.py
  service.py
  zookeeper.properties

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-kafka-python.spec ++++++
--- /var/tmp/diff_new_pack.BIwP3G/_old  2022-09-07 11:06:35.052490035 +0200
+++ /var/tmp/diff_new_pack.BIwP3G/_new  2022-09-07 11:06:35.056490046 +0200
@@ -1,7 +1,7 @@
 #
 # spec file for package python-kafka-python
 #
-# Copyright (c) 2020 SUSE LLC
+# Copyright (c) 2022 SUSE LLC
 #
 # All modifications and additions to the file contributed by third parties
 # remain the property of their copyright owners, unless otherwise agreed
@@ -25,12 +25,27 @@
 Group:          Development/Languages/Python
 URL:            https://github.com/mumrah/kafka-python
 Source:         
https://files.pythonhosted.org/packages/source/k/kafka-python/kafka-python-%{version}.tar.gz
+Source1:        
https://raw.githubusercontent.com/dpkp/kafka-python/master/servers/0.11.0.3/resources/zookeeper.properties
+Source2:        
https://raw.githubusercontent.com/dpkp/kafka-python/master/test/conftest.py
+Source3:        
https://raw.githubusercontent.com/dpkp/kafka-python/master/test/fixtures.py
+Source4:        
https://raw.githubusercontent.com/dpkp/kafka-python/master/test/service.py
 BuildRequires:  %{python_module setuptools}
-BuildRequires:  %{python_module six}
 BuildRequires:  fdupes
 BuildRequires:  python-rpm-macros
-Requires:       python-six
+# Recommends:   python-crc32c  # Not packaged
+Recommends:     python-zstandard
+Suggests:       python-lz4
+Suggests:       python-xxhash
 BuildArch:      noarch
+# SECTION test requirements
+BuildRequires:  %{python_module lz4}
+BuildRequires:  %{python_module mock}
+BuildRequires:  %{python_module pytest-mock}
+BuildRequires:  %{python_module pytest}
+BuildRequires:  %{python_module python-snappy}
+BuildRequires:  %{python_module xxhash}
+BuildRequires:  %{python_module zstandard}
+# /SECTION
 %python_subpackages
 
 %description
@@ -41,6 +56,12 @@
 
 %prep
 %setup -q -n kafka-python-%{version}
+mkdir -p servers/0.11.0.2/resources/
+cp %{SOURCE1} servers/0.11.0.2/resources/
+
+cp %{SOURCE2} %{SOURCE3} %{SOURCE4} test/
+
+touch test/__init__.py
 
 %build
 %python_build
@@ -49,10 +70,13 @@
 %python_install
 %python_expand %fdupes %{buildroot}%{$python_sitelib}
 
+%check
+# test_kafka_producer_gc_cleanup is sometimes off by 1
+%pytest -rs -k 'not (test_kafka_consumer_offsets_for_time_old or 
test_kafka_producer_gc_cleanup)'
+
 %files %{python_files}
 %license LICENSE
 %doc README.rst
-%{python_sitelib}/*
-%exclude %{python_sitelib}/tests/
+%{python_sitelib}/kafka*/
 
 %changelog

++++++ conftest.py ++++++
from __future__ import absolute_import

import uuid

import pytest

from test.testutil import env_kafka_version, random_string
from test.fixtures import KafkaFixture, ZookeeperFixture

@pytest.fixture(scope="module")
def zookeeper():
    """Return a Zookeeper fixture"""
    zk_instance = ZookeeperFixture.instance()
    yield zk_instance
    zk_instance.close()


@pytest.fixture(scope="module")
def kafka_broker(kafka_broker_factory):
    """Return a Kafka broker fixture"""
    return kafka_broker_factory()[0]


@pytest.fixture(scope="module")
def kafka_broker_factory(zookeeper):
    """Return a Kafka broker fixture factory"""
    assert env_kafka_version(), 'KAFKA_VERSION must be specified to run 
integration tests'

    _brokers = []
    def factory(**broker_params):
        params = {} if broker_params is None else broker_params.copy()
        params.setdefault('partitions', 4)
        num_brokers = params.pop('num_brokers', 1)
        brokers = tuple(KafkaFixture.instance(x, zookeeper, **params)
                        for x in range(num_brokers))
        _brokers.extend(brokers)
        return brokers

    yield factory

    for broker in _brokers:
        broker.close()


@pytest.fixture
def kafka_client(kafka_broker, request):
    """Return a KafkaClient fixture"""
    (client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % 
(request.node.name,))
    yield client
    client.close()


@pytest.fixture
def kafka_consumer(kafka_consumer_factory):
    """Return a KafkaConsumer fixture"""
    return kafka_consumer_factory()


@pytest.fixture
def kafka_consumer_factory(kafka_broker, topic, request):
    """Return a KafkaConsumer factory fixture"""
    _consumer = [None]

    def factory(**kafka_consumer_params):
        params = {} if kafka_consumer_params is None else 
kafka_consumer_params.copy()
        params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
        params.setdefault('auto_offset_reset', 'earliest')
        _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], 
**params))
        return _consumer[0]

    yield factory

    if _consumer[0]:
        _consumer[0].close()


@pytest.fixture
def kafka_producer(kafka_producer_factory):
    """Return a KafkaProducer fixture"""
    yield kafka_producer_factory()


@pytest.fixture
def kafka_producer_factory(kafka_broker, request):
    """Return a KafkaProduce factory fixture"""
    _producer = [None]

    def factory(**kafka_producer_params):
        params = {} if kafka_producer_params is None else 
kafka_producer_params.copy()
        params.setdefault('client_id', 'producer_%s' % (request.node.name,))
        _producer[0] = next(kafka_broker.get_producers(cnt=1, **params))
        return _producer[0]

    yield factory

    if _producer[0]:
        _producer[0].close()

@pytest.fixture
def kafka_admin_client(kafka_admin_client_factory):
    """Return a KafkaAdminClient fixture"""
    yield kafka_admin_client_factory()

@pytest.fixture
def kafka_admin_client_factory(kafka_broker):
    """Return a KafkaAdminClient factory fixture"""
    _admin_client = [None]

    def factory(**kafka_admin_client_params):
        params = {} if kafka_admin_client_params is None else 
kafka_admin_client_params.copy()
        _admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params))
        return _admin_client[0]

    yield factory

    if _admin_client[0]:
        _admin_client[0].close()

@pytest.fixture
def topic(kafka_broker, request):
    """Return a topic fixture"""
    topic_name = '%s_%s' % (request.node.name, random_string(10))
    kafka_broker.create_topics([topic_name])
    return topic_name


@pytest.fixture
def conn(mocker):
    """Return a connection mocker fixture"""
    from kafka.conn import ConnectionStates
    from kafka.future import Future
    from kafka.protocol.metadata import MetadataResponse
    conn = mocker.patch('kafka.client_async.BrokerConnection')
    conn.return_value = conn
    conn.state = ConnectionStates.CONNECTED
    conn.send.return_value = Future().success(
        MetadataResponse[0](
            [(0, 'foo', 12), (1, 'bar', 34)],  # brokers
            []))  # topics
    conn.blacked_out.return_value = False
    def _set_conn_state(state):
        conn.state = state
        return state
    conn._set_conn_state = _set_conn_state
    conn.connect.side_effect = lambda: conn.state
    conn.connect_blocking.return_value = True
    conn.connecting = lambda: conn.state in (ConnectionStates.CONNECTING,
                                             ConnectionStates.HANDSHAKE)
    conn.connected = lambda: conn.state is ConnectionStates.CONNECTED
    conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED
    return conn


@pytest.fixture()
def send_messages(topic, kafka_producer, request):
    """A factory that returns a send_messages function with a pre-populated
    topic topic / producer."""

    def _send_messages(number_range, partition=0, topic=topic, 
producer=kafka_producer, request=request):
        """
            messages is typically `range(0,100)`
            partition is an int
        """
        messages_and_futures = []  # [(message, produce_future),]
        for i in number_range:
            # request.node.name provides the test name (including parametrized 
values)
            encoded_msg = '{}-{}-{}'.format(i, request.node.name, 
uuid.uuid4()).encode('utf-8')
            future = kafka_producer.send(topic, value=encoded_msg, 
partition=partition)
            messages_and_futures.append((encoded_msg, future))
        kafka_producer.flush()
        for (msg, f) in messages_and_futures:
            assert f.succeeded()
        return [msg for (msg, f) in messages_and_futures]

    return _send_messages

++++++ fixtures.py ++++++
++++ 674 lines (skipped)

++++++ service.py ++++++
from __future__ import absolute_import

import logging
import os
import re
import select
import subprocess
import sys
import threading
import time

__all__ = [
    'ExternalService',
    'SpawnedService',
]

log = logging.getLogger(__name__)


class ExternalService(object):
    def __init__(self, host, port):
        log.info("Using already running service at %s:%d", host, port)
        self.host = host
        self.port = port

    def open(self):
        pass

    def close(self):
        pass


class SpawnedService(threading.Thread):
    def __init__(self, args=None, env=None):
        super(SpawnedService, self).__init__()

        if args is None:
            raise TypeError("args parameter is required")
        self.args = args
        self.env = env
        self.captured_stdout = []
        self.captured_stderr = []

        self.should_die = threading.Event()
        self.child = None
        self.alive = False
        self.daemon = True
        log.info("Created service for command:")
        log.info(" "+' '.join(self.args))
        log.debug("With environment:")
        for key, value in self.env.items():
            log.debug("  {key}={value}".format(key=key, value=value))

    def _spawn(self):
        if self.alive: return
        if self.child and self.child.poll() is None: return

        self.child = subprocess.Popen(
            self.args,
            preexec_fn=os.setsid, # to avoid propagating signals
            env=self.env,
            bufsize=1,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
        self.alive = self.child.poll() is None

    def _despawn(self):
        if self.child.poll() is None:
            self.child.terminate()
        self.alive = False
        for _ in range(50):
            if self.child.poll() is not None:
                self.child = None
                break
            time.sleep(0.1)
        else:
            self.child.kill()

    def run(self):
        self._spawn()
        while True:
            try:
                (rds, _, _) = select.select([self.child.stdout, 
self.child.stderr], [], [], 1)
            except select.error as ex:
                if ex.args[0] == 4:
                    continue
                else:
                    raise

            if self.child.stdout in rds:
                line = self.child.stdout.readline().decode('utf-8').rstrip()
                if line:
                    self.captured_stdout.append(line)

            if self.child.stderr in rds:
                line = self.child.stderr.readline().decode('utf-8').rstrip()
                if line:
                    self.captured_stderr.append(line)

            if self.child.poll() is not None:
                self.dump_logs()
                break

            if self.should_die.is_set():
                self._despawn()
                break

    def dump_logs(self):
        sys.stderr.write('\n'.join(self.captured_stderr))
        sys.stdout.write('\n'.join(self.captured_stdout))

    def wait_for(self, pattern, timeout=30):
        start = time.time()
        while True:
            if not self.is_alive():
                raise RuntimeError("Child thread died already.")

            elapsed = time.time() - start
            if elapsed >= timeout:
                log.error("Waiting for %r timed out after %d seconds", pattern, 
timeout)
                return False

            if re.search(pattern, '\n'.join(self.captured_stdout), 
re.IGNORECASE) is not None:
                log.info("Found pattern %r in %d seconds via stdout", pattern, 
elapsed)
                return True
            if re.search(pattern, '\n'.join(self.captured_stderr), 
re.IGNORECASE) is not None:
                log.info("Found pattern %r in %d seconds via stderr", pattern, 
elapsed)
                return True
            time.sleep(0.1)

    def stop(self):
        self.should_die.set()
        self.join()

++++++ zookeeper.properties ++++++
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir={tmp_dir}
# the port at which the clients will connect
clientPort={port}
clientPortAddress={host}
# disable the per-ip limit on the number of connections since this is a 
non-production config
maxClientCnxns=0

Reply via email to