git-hulk commented on issue #2900:
URL: https://github.com/apache/kvrocks/issues/2900#issuecomment-2837848286

   @siren, Thanks for your detailed information. It's really helpful for us to 
identify this issue. We suspect this issue should be caused by the data race 
between the transaction mode and blocking requests.
   
   You could help to verify this issue disappeared after disabling the 
transaction in pipeline with `r.pipeline(transaction=False)`.
   
   The following scripts cannot reproduce the crash on my side, but it could 
prove that the suspect:
   
   ```Python
   import redis
   import time
   import threading
   
   def consumer_worker(r, queue_name):
       """Consumer that uses blocking pop to wait for messages"""
       print(f"Consumer: Waiting for messages on {queue_name}...")
       while True:
           try:
               # Use BRPOP to block until a message is available (with 5 second 
timeout)
               result = r.brpop(queue_name, timeout=5)
               if result:
                   queue, message = result
                   print(f"Consumer: Received '{message.decode('utf-8')}' from 
{queue.decode('utf-8')}")
               else:
                   # No message received within timeout period
                   print("Consumer: No messages, still waiting...")
           except redis.ConnectionError:
               # This will happen when we close the connection
               break
           except Exception as e:
               print(f"Consumer error: {e}")
               break
   
   def main():
       # Create Redis clients
       producer = redis.Redis(host='localhost', port=6666, db=0)
       consumer = redis.Redis(host='localhost', port=6666, db=0)
       queue_name = "demo-queue"
       try:
           # Clear any existing data
           producer.delete(queue_name)
           # Start consumer in a separate thread
           for i in range(1,5):
               consumer_thread = threading.Thread(
                   target=consumer_worker,
                   args=(consumer, queue_name),
                   daemon=True
               )
               consumer_thread.start()
   
           # Producer: push messages to the queue
           for i in range(1, 60):
               time.sleep(6)
               message = f"Message {i}"
               pipe = producer.pipeline()
               pipe.lpush(queue_name, message)
               results = pipe.execute()
   
               for msg in results:
                   print(msg)
   
               print(f"Producer: Pushed '{message}'")
               # Wait a bit between messages
           print("Producer: Done pushing messages")
           # Wait to see all messages being consumed
           time.sleep(70)
       except Exception as e:
           print(f"Error: {e}")
       finally:
           # Clean up (this will cause the consumer to exit)
           print("Disconnecting from Redis")
           producer.close()
           consumer.close()
   
   if __name__ == "__main__":
       main()
   ``` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to