Downloading the forum
Hello all,
For a bit of a break from my usual projects, I thought I'd get back into web scraping. As an exercise to see how much data is generated, I thought I'd write a script to download this forum, almost in its entirety, and save it in an SQLite database.
I converted the text of posts to markdown, but other than that I left everything alone.
I'm posting this here so anyone who's interested in web scraping can see how it's done, not because I condone idiots storing everything they ever come across. I'm also not sure of the legality of holding that kind of data (although this script only downloads usernames), so I'll personally be deleting the database once it's done.
Honestly, the only thing I'm interested in is who's posted the most!
All of that said, here's the code for anyone who's interested:
"""The audiogames.net forum downloader."""
from datetime import datetime from typing import Iterator, List, Optional, Union from bs4 import BeautifulSoup from bs4.element import NavigableString, Tag from html2markdown import convert from requests import Response from requests import Session as RequestsSession from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, create_engine from sqlalchemy.engine.base import Engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import Query, relationship, sessionmaker from sqlalchemy.orm.relationships import RelationshipProperty FindType = Union[Tag, NavigableString] engine: Engine = create_engine("sqlite:///db.sqlite3") class _BaseClass: """Add an ID.""" __tablename__: str id = Column(Integer, primary_key=True) def save(self) -> None: """Save this instance.""" session.add(self) session.commit() @classmethod def query(cls, *args, **kwargs) -> Query: """Return a query linked to this class.""" return session.query(cls).filter(*args).filter_by(**kwargs) @classmethod def count(cls, *args, **kwargs) -> int: """Return the number of rows that match the given criteria.""" return cls.query(*args, **kwargs).count() Base = declarative_base(bind=engine, cls=_BaseClass) Session = sessionmaker(bind=engine) session = Session() class NameMixin: """Add a name parameter.""" id: int name = Column(String(1024), nullable=False) def __str__(self) -> str: """Return a string representation of this object.""" return f"{self.name} (#{self.id})" class User(Base, NameMixin): # type:ignore[valid-type, misc] """A forum user.""" __tablename__ = "users" name = Column(String(50), nullable=False) registered = Column(DateTime(timezone=True), nullable=True) class Room(Base, NameMixin): # type:ignore[valid-type, misc] """A room in the forum.""" __tablename__ = "rooms" class Thread(Base, NameMixin): # type:ignore[valid-type, misc] """A forum thread.""" __tablename__ = "threads" user_id = Column(Integer, ForeignKey("users.id"), nullable=True) user: RelationshipProperty = relationship("User", backref="threads") room_id = Column(Integer, ForeignKey("rooms.id"), nullable=False) room: RelationshipProperty = relationship("Room", backref="threads") class Post(Base): # type:ignore[valid-type, misc] """A forum post.""" __tablename__ = "posts" posted = Column(DateTime(timezone=True), nullable=False) text = Column(String(65535), nullable=True) url = "" nullable=False) user_id = Column(Integer, ForeignKey("users.id"), nullable=False) user: RelationshipProperty = relationship("User", backref="posts") thread_id = Column(Integer, ForeignKey("threads.id"), nullable=False) thread: RelationshipProperty = relationship("Thread", backref="posts") Base.metadata.create_all() url: str = "https://forum.audiogames.net/" http: RequestsSession = RequestsSession() def main() -> None: """Start scraping.""" response: Response = http.get(url) soup: BeautifulSoup = BeautifulSoup(response.text, "lxml") h3: Tag tags: Iterator[FindType] = soup.find_all("h3") for h3 in tags: assert isinstance(h3, Tag) parse_room(h3) def parse_room(h3: Tag) -> None: """Parse a room from a link. :param h3: The level 3 heading containing the link from the main forum. """ a: Optional[FindType] = h3.find("a") if a is None or isinstance(a, NavigableString): raise RuntimeError("Invalid room link:\n%s" % h3) href: str = a["href"] name: str = a.text room: Optional[Room] = Room.query(name=name).first() if room is None: room = Room(name=a.text) room.save() print(f"Created room {room.name}.") else: print(f"Using existing room {room}.") response = http.get(href) soup = BeautifulSoup(response.text, "lxml") p: Optional[FindType] = soup.find("p", attrs={"class": "paging"}) if p is None or isinstance(p, NavigableString): return print("Cannot find page links for this room.") links: List[FindType] = p.find_all("a") a = links[-2] assert isinstance(a, Tag) parse_pages(room, a) def parse_pages(room: Room, a: Tag) -> None: """Parse pages of threads for a particular room. :param room: The room to work in. :param a: The link to the page with the highest number. """ href = "" href = "" href.rindex("/") + 1] + "%d" page: int = int(a.text) while page > 0: print(f"Parsing page {page}.") response = http.get(href % page) soup = BeautifulSoup(response.text, "lxml") tags = soup.find_all("h3") for h3 in tags: assert isinstance(h3, Tag) parse_thread(room, h3) room.save() page -= 1 def parse_thread(room: Room, h3: Tag) -> None: """Parse a particular thread in the given room. :param room: The room to work in. :param h3: The level 3 heading containing the link to the thread to parse. """ a = h3.find("a") assert isinstance(a, Tag) name = a.text href = "" thread: Optional[Thread] = Thread.query(name=name, room=room).first() if thread is None: thread = Thread(name=name, room=room) response = http.get(href) soup = BeautifulSoup(response.text, "lxml") tags = soup.find_all("div", attrs={"class": "post"}) div: Tag for div in tags: assert isinstance(div, Tag) parse_message(thread, div) def parse_message(thread: Thread, div: Tag) -> None: """Parse the given message. :param thread: The thread this message will belong to. :param div: The div element containing the message to parse. """ href = "" post_id: str = href[len(url) :] post_id = post_id[len("post/") :].split("/")[0] if Post.count(id=post_id) > 0: return print(f"Skipping message #{post_id}.") span: Optional[FindType] = div.find("span", attrs={"class": "post-byline"}) assert isinstance(span, Tag) username: str = span.find("strong").text user: Optional[User] = User.query(name=username).first() if user is None: print(f"Creating user {username}.") ul: Optional[FindType] = div.find("ul", attrs={"class": "author-info"}) assert isinstance(ul, Tag) li: Optional[FindType] = ul.find( lambda t: t.name == "span" and t.text.startswith("Registered:") ) registered: Optional[datetime] = None if li is not None: registered = datetime.fromisoformat(li.find("strong").text) user = User(name=username, registered=registered) user.save() else: print(f"Using existing user {user}.") if "firstpost" in div["class"]: print(f"{username} is thread starter.") thread.user = user thread.save() content: Optional[FindType] = div.find("div", attrs={"class": "entry-content"}) assert isinstance(content, Tag) signature: Optional[FindType] = content.find("div") span = div.find("span", attrs={"class": "post-link"}) assert isinstance(span, Tag) posted: datetime = datetime.fromisoformat(span.text) strings: List[str] = [] child: FindType for child in content: if isinstance(child, NavigableString): continue if child is not signature: strings.append(convert(str(child))) post: Post = Post( id=int( post_id, ), posted=posted, text="\n\n".join(strings), user=user, thread=thread, url="" ) print(f"Created post #{post_id}.") post.save() if __name__ == "__main__": try: main() except KeyboardInterrupt: print("Aborted.") finally: print(f"Users: {User.count()}") print(f"Threads: {Thread.count()}") print(f"Posts: {Post.count()}.") session.close()
-- Audiogames-reflector mailing list Audiogames-reflector@sabahattin-gucukoglu.com https://sabahattin-gucukoglu.com/cgi-bin/mailman/listinfo/audiogames-reflector