This is an automated email from the ASF dual-hosted git repository.

techdocsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a9aefbb0f Move from Jupyter notebook to Jupyter Lab and introduce a 
notebook folder structure (#14419)
1a9aefbb0f is described below

commit 1a9aefbb0f6dfe4eff752bf78bf2df221b5944a5
Author: Sergio Ferragut <[email protected]>
AuthorDate: Wed Jun 21 09:11:00 2023 -0700

    Move from Jupyter notebook to Jupyter Lab and introduce a notebook folder 
structure (#14419)
---
 examples/quickstart/jupyter-notebooks/Dockerfile   |   35 +-
 .../docker-jupyter/docker-compose-local.yaml       |   14 +-
 .../docker-jupyter/docker-compose.yaml             |   14 +-
 .../jupyter-notebooks/kafka-tutorial.ipynb         |  788 --------------
 .../01-introduction/00-START-HERE.ipynb}           |   12 +-
 .../01-druidapi-package-intro.ipynb}               |   18 +-
 .../02-ingestion/01-streaming-from-kafka.ipynb     |  537 ++++++++++
 .../notebooks/02-ingestion/DruidDataDriver.py      | 1133 ++++++++++++++++++++
 .../02-ingestion}/kafka_docker_config.json         |    0
 .../03-query/00-using-sql-with-druidapi.ipynb}     |    2 +-
 .../04-api/00-getting-started.ipynb}               |    2 +-
 11 files changed, 1713 insertions(+), 842 deletions(-)

diff --git a/examples/quickstart/jupyter-notebooks/Dockerfile 
b/examples/quickstart/jupyter-notebooks/Dockerfile
index 492a4da9c1..52e10175c2 100644
--- a/examples/quickstart/jupyter-notebooks/Dockerfile
+++ b/examples/quickstart/jupyter-notebooks/Dockerfile
@@ -31,13 +31,13 @@ FROM jupyter/base-notebook
 WORKDIR /home/jovyan
 
 # Install required Python packages
-RUN pip install requests
-RUN pip install pandas
-RUN pip install numpy
-RUN pip install seaborn
-RUN pip install bokeh
-RUN pip install kafka-python
-RUN pip install sortedcontainers
+RUN pip install requests \
+    pip install pandas \
+    pip install numpy \
+    pip install seaborn \
+    pip install bokeh \
+    pip install kafka-python \
+    pip install sortedcontainers 
 
 # Install druidapi client from apache/druid
 # Local install requires sudo privileges 
@@ -45,21 +45,22 @@ USER root
 ADD druidapi /home/jovyan/druidapi
 WORKDIR /home/jovyan/druidapi
 RUN pip install .
-WORKDIR /home/jovyan
 
+
+
+# WIP -- install DruidDataDriver as a package
 # Import data generator and configuration file
 # Change permissions to allow import (requires sudo privileges)
-# WIP -- change to apache repo
-ADD 
https://raw.githubusercontent.com/shallada/druid/data-generator/examples/quickstart/jupyter-notebooks/data-generator/DruidDataDriver.py
 .
-ADD docker-jupyter/kafka_docker_config.json .
-RUN chmod 664 DruidDataDriver.py
-RUN chmod 664 kafka_docker_config.json
+
+# The Jupyter notebooks themselves are mounted into the image's 
/home/jovyan/notebooks
+# path when running this image.
+RUN mkdir -p /home/jovyan/notebooks
+
+WORKDIR /home/jovyan/notebooks
 USER jovyan
 
-# Copy the Jupyter notebook tutorials from the
-# build directory to the image working directory
-COPY ./*ipynb .
+
 
 # Add location of the data generator to PYTHONPATH
-ENV PYTHONPATH "${PYTHONPATH}:/home/jovyan"
+ENV PYTHONPATH "${PYTHONPATH}:/home/jovyan/notebooks/02-ingestion"
 
diff --git 
a/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose-local.yaml
 
b/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose-local.yaml
index e0ea20c17b..3d7baef905 100644
--- 
a/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose-local.yaml
+++ 
b/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose-local.yaml
@@ -45,9 +45,7 @@ services:
   zookeeper:
     image: zookeeper:latest
     container_name: zookeeper
-    profiles: ["druid-jupyter", "all-services"]
-    ports:
-      - "2181:2181"
+    profiles: ["druid-jupyter", "kafka-jupyter", "all-services"]
     environment:
       - ZOO_MY_ID=1
       - ALLOW_ANONYMOUS_LOGIN=yes
@@ -55,7 +53,7 @@ services:
   kafka:
     image: bitnami/kafka:latest
     container_name: kafka-broker
-    profiles: ["all-services"]
+    profiles: ["kafka-jupyter", "all-services"]
     ports:
     # To learn about configuring Kafka for access across networks see
     # 
https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
@@ -163,11 +161,13 @@ services:
       context: ..
       dockerfile: Dockerfile
     container_name: jupyter
-    profiles: ["jupyter", "druid-jupyter", "all-services"]
+    profiles: ["jupyter", "kafka-jupyter", "druid-jupyter", "all-services"]
     environment:
-      DOCKER_STACKS_JUPYTER_CMD: "notebook"
+      JUPYTER_ENABLE_LAB: "yes"
+      JUPYTER_TOKEN: "docker"
+      DOCKER_STACKS_JUPYTER_CMD: "lab"
       NOTEBOOK_ARGS: "--NotebookApp.token=''"
     ports:
       - "${JUPYTER_PORT:-8889}:8888"
     volumes:
-      - ./notebooks:/home/jovyan/work
+      - ../notebooks:/home/jovyan/notebooks
diff --git 
a/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose.yaml 
b/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose.yaml
index 9b784fe8de..e6f2cd95ae 100644
--- a/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose.yaml
+++ b/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose.yaml
@@ -45,9 +45,7 @@ services:
   zookeeper:
     image: zookeeper:latest
     container_name: zookeeper
-    profiles: ["druid-jupyter", "all-services"]
-    ports:
-      - "2181:2181"
+    profiles: ["druid-jupyter", "kafka-jupyter", "all-services"]
     environment:
       - ZOO_MY_ID=1
       - ALLOW_ANONYMOUS_LOGIN=yes
@@ -55,7 +53,7 @@ services:
   kafka:
     image: bitnami/kafka:latest
     container_name: kafka-broker
-    profiles: ["all-services"]
+    profiles: ["kafka-jupyter", "all-services"]
     ports:
     # To learn about configuring Kafka for access across networks see
     # 
https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
@@ -161,11 +159,13 @@ services:
   jupyter:
     image: imply/druid-notebook:latest
     container_name: jupyter
-    profiles: ["jupyter", "druid-jupyter", "all-services"]
+    profiles: ["jupyter", "kafka-jupyter", "druid-jupyter", "all-services"]
     environment:
-      DOCKER_STACKS_JUPYTER_CMD: "notebook"
+      JUPYTER_ENABLE_LAB: "yes"
+      JUPYTER_TOKEN: "docker"
+      DOCKER_STACKS_JUPYTER_CMD: "lab"
       NOTEBOOK_ARGS: "--NotebookApp.token=''"
     ports:
       - "${JUPYTER_PORT:-8889}:8888"
     volumes:
-      - ./notebooks:/home/jovyan/work
+      - ../notebooks:/home/jovyan/notebooks
diff --git a/examples/quickstart/jupyter-notebooks/kafka-tutorial.ipynb 
b/examples/quickstart/jupyter-notebooks/kafka-tutorial.ipynb
deleted file mode 100644
index b25f49ff95..0000000000
--- a/examples/quickstart/jupyter-notebooks/kafka-tutorial.ipynb
+++ /dev/null
@@ -1,788 +0,0 @@
-{
- "cells": [
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "Ingest and query data from Apache Kafka\n",
-    "\n",
-    "<!--\n",
-    "  ~ Licensed to the Apache Software Foundation (ASF) under one\n",
-    "  ~ or more contributor license agreements.  See the NOTICE file\n",
-    "  ~ distributed with this work for additional information\n",
-    "  ~ regarding copyright ownership.  The ASF licenses this file\n",
-    "  ~ to you under the Apache License, Version 2.0 (the\n",
-    "  ~ \"License\"); you may not use this file except in compliance\n",
-    "  ~ with the License.  You may obtain a copy of the License at\n",
-    "  ~\n",
-    "  ~   http://www.apache.org/licenses/LICENSE-2.0\n";,
-    "  ~\n",
-    "  ~ Unless required by applicable law or agreed to in writing,\n",
-    "  ~ software distributed under the License is distributed on an\n",
-    "  ~ \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
-    "  ~ KIND, either express or implied.  See the License for the\n",
-    "  ~ specific language governing permissions and limitations\n",
-    "  ~ under the License.\n",
-    "  -->\n",
-    "\n",
-    "This tutorial introduces you to streaming ingestion in Apache Druid using 
the Apache Kafka event streaming platform.\n",
-    "Follow along to learn how to create and load data into a Kafka topic, 
start ingesting data from the topic into Druid, and query results over time. 
This tutorial assumes you have a basic understanding of Druid ingestion, 
querying, and API requests."
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Table of contents\n",
-    "\n",
-    "* [Prerequisites](#Prerequisites)\n",
-    "* [Load Druid API client](#Load-Druid-API-client)\n",
-    "* [Create Kafka topic](#Create-Kafka-topic)\n",
-    "* [Load data into Kafka topic](#Load-data-into-Kafka-topic)\n",
-    "* [Start Druid ingestion](#Start-Druid-ingestion)\n",
-    "* [Query Druid datasource and visualize query 
results](#Query-Druid-datasource-and-visualize-query-results)\n",
-    "* [Learn more](#Learn-more)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Prerequisites\n",
-    "\n",
-    "This tutorial works with Druid 25.0.0 or later.\n",
-    "\n",
-    "Launch this tutorial and all prerequisites using the `all-services` 
profile of the Docker Compose file for Jupyter-based Druid tutorials. For more 
information, see [Docker for Jupyter Notebook 
tutorials](https://druid.apache.org/docs/latest/tutorials/tutorial-jupyter-docker.html).\n",
-    "\n",
-    "If you do not use the Docker Compose environment, you need the 
following:\n",
-    "* A running Druid instance.\n",
-    "   * Update the `druid_host` variable to point to your Router endpoint. 
For example, `druid_host = \"http://localhost:8888\"`.\n";,
-    "   * Update the `rest_client` variable to point to your Coordinator 
endpoint. For example, `\"http://localhost:8081\"`.\n";,
-    "* A running Kafka cluster.\n",
-    "   * Update the Kafka bootstrap servers to point to your servers. For 
example, `bootstrap_servers=[\"localhost:9092\"]`.\n",
-    "* The following Python packages:\n",
-    "   * `druidapi`, a Python client for Apache Druid\n",
-    "   * `DruidDataDriver`, a data generator\n",
-    "   * `kafka`, a Python client for Apache Kafka\n",
-    "   * `pandas`, `matplotlib`, and `seaborn` for data visualization\n"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Load Druid API client"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "To start the tutorial, run the following cell. It imports the required 
Python packages and defines a variable for the Druid client, and another for 
the SQL client used to run SQL commands."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 1,
-   "metadata": {},
-   "outputs": [
-    {
-     "data": {
-      "text/html": [
-       "\n",
-       "<style>\n",
-       "  .druid table {\n",
-       "    border: 1px solid black;\n",
-       "    border-collapse: collapse;\n",
-       "  }\n",
-       "\n",
-       "  .druid th, .druid td {\n",
-       "    padding: 4px 1em ;\n",
-       "    text-align: left;\n",
-       "  }\n",
-       "\n",
-       "  td.druid-right, th.druid-right {\n",
-       "    text-align: right;\n",
-       "  }\n",
-       "\n",
-       "  td.druid-center, th.druid-center {\n",
-       "    text-align: center;\n",
-       "  }\n",
-       "\n",
-       "  .druid .druid-left {\n",
-       "    text-align: left;\n",
-       "  }\n",
-       "\n",
-       "  .druid-alert {\n",
-       "    font-weight: bold;\n",
-       "  }\n",
-       "\n",
-       "  .druid-error {\n",
-       "    color: red;\n",
-       "  }\n",
-       "</style>\n"
-      ],
-      "text/plain": [
-       "<IPython.core.display.HTML object>"
-      ]
-     },
-     "metadata": {},
-     "output_type": "display_data"
-    }
-   ],
-   "source": [
-    "import druidapi\n",
-    "import json\n",
-    "\n",
-    "# druid_host is the hostname and port for your Druid deployment. \n",
-    "# In the Docker Compose tutorial environment, this is the Router\n",
-    "# service running at \"http://router:8888\".\n";,
-    "# If you are not using the Docker Compose environment, edit the 
`druid_host`.\n",
-    "\n",
-    "druid_host = \"http://router:8888\"\n";,
-    "druid_host\n",
-    "\n",
-    "druid = druidapi.jupyter_client(druid_host)\n",
-    "display = druid.display\n",
-    "sql_client = druid.sql\n",
-    "\n",
-    "# Create a rest client for native JSON ingestion for streaming data\n",
-    "rest_client = druidapi.rest.DruidRestClient(\"http://coordinator:8081\";)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Create Kafka topic"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "This notebook relies on the Python client for the Apache Kafka. Import 
the Kafka producer and consumer modules, then create a Kafka client. You use 
the Kafka producer to create and publish records to a new topic named 
`social_media`."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 2,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "from kafka import KafkaProducer\n",
-    "from kafka import KafkaConsumer\n",
-    "\n",
-    "# Kafka runs on kafka:9092 in multi-container tutorial application\n",
-    "producer = KafkaProducer(bootstrap_servers='kafka:9092')\n",
-    "topic_name = \"social_media\""
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "Create the `social_media` topic and send a sample event. The `send()` 
command returns a metadata descriptor for the record."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 3,
-   "metadata": {},
-   "outputs": [
-    {
-     "data": {
-      "text/plain": [
-       "<kafka.producer.future.FutureRecordMetadata at 0x7f5f65344610>"
-      ]
-     },
-     "execution_count": 3,
-     "metadata": {},
-     "output_type": "execute_result"
-    }
-   ],
-   "source": [
-    "event = {\n",
-    "    \"__time\": \"2023-01-03T16:40:21.501\",\n",
-    "    \"username\": \"willow\",\n",
-    "    \"post_title\": \"This title is required\",\n",
-    "    \"views\": 15284,\n",
-    "    \"upvotes\": 124,\n",
-    "    \"comments\": 21,\n",
-    "    \"edited\": \"True\"\n",
-    "}\n",
-    "\n",
-    "producer.send(topic_name, json.dumps(event).encode('utf-8'))"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "To verify that the Kafka topic stored the event, create a consumer client 
to read records from the Kafka cluster, and get the next (only) message:"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 4,
-   "metadata": {},
-   "outputs": [
-    {
-     "name": "stdout",
-     "output_type": "stream",
-     "text": [
-      "{\"__time\": \"2023-01-03T16:40:21.501\", \"username\": \"willow\", 
\"post_title\": \"This title is required\", \"views\": 15284, \"upvotes\": 124, 
\"comments\": 21, \"edited\": \"True\"}\n"
-     ]
-    }
-   ],
-   "source": [
-    "consumer = KafkaConsumer(topic_name, bootstrap_servers=['kafka:9092'], 
auto_offset_reset='earliest',\n",
-    "     enable_auto_commit=True)\n",
-    "\n",
-    "print(next(consumer).value.decode('utf-8'))"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Load data into Kafka topic"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "Instead of manually creating events to send to the Kafka topic, use a 
data generator to simulate a continuous data stream. This tutorial makes use of 
Druid Data Driver to simulate a continuous data stream into the `social_media` 
Kafka topic. To learn more about the Druid Data Driver, see the Druid Summit 
talk, [Generating Time centric Data for Apache 
Druid](https://www.youtube.com/watch?v=3zAOeLe3iAo).\n",
-    "\n",
-    "In this notebook, you use a background process to continuously load data 
into the Kafka topic.\n",
-    "This allows you to keep executing commands in this notebook while data is 
constantly being streamed into the topic."
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "Run the following cells to load sample data into the `social_media` Kafka 
topic:"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 5,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "import multiprocessing as mp\n",
-    "from datetime import datetime\n",
-    "import DruidDataDriver"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 6,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "def run_driver():\n",
-    "    DruidDataDriver.simulate(\"kafka_docker_config.json\", None, None, 
\"REAL\", datetime.now())\n",
-    "        \n",
-    "mp.set_start_method('fork')\n",
-    "ps = mp.Process(target=run_driver)\n",
-    "ps.start()"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Start Druid ingestion"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "Now that you have a new Kafka topic and data being streamed into the 
topic, you ingest the data into Druid by submitting a Kafka ingestion spec.\n",
-    "The ingestion spec describes the following:\n",
-    "* where to source the data to ingest (in `spec > ioConfig`),\n",
-    "* the datasource to ingest data into (in `spec > dataSchema > 
dataSource`), and\n",
-    "* what the data looks like (in `spec > dataSchema > dimensionsSpec`).\n",
-    "\n",
-    "Other properties control how Druid aggregates and stores data. For more 
information, see the Druid documenation:\n",
-    "* [Apache Kafka 
ingestion](https://druid.apache.org/docs/latest/development/extensions-core/kafka-ingestion.html)\n",
-    "* [Ingestion spec 
reference](https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html)\n",
-    "\n",
-    "Run the following cells to define and view the Kafka ingestion spec."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 7,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "kafka_ingestion_spec = \"{\\\"type\\\": \\\"kafka\\\",\\\"spec\\\": 
{\\\"ioConfig\\\": {\\\"type\\\": \\\"kafka\\\",\\\"consumerProperties\\\": 
{\\\"bootstrap.servers\\\": \\\"kafka:9092\\\"},\\\"topic\\\": 
\\\"social_media\\\",\\\"inputFormat\\\": {\\\"type\\\": 
\\\"json\\\"},\\\"useEarliestOffset\\\": true},\\\"tuningConfig\\\": 
{\\\"type\\\": \\\"kafka\\\"},\\\"dataSchema\\\": {\\\"dataSource\\\": 
\\\"social_media\\\",\\\"timestampSpec\\\": {\\\"column\\\": 
\\\"__time\\\",\\\"for [...]
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 8,
-   "metadata": {},
-   "outputs": [
-    {
-     "name": "stdout",
-     "output_type": "stream",
-     "text": [
-      "{\n",
-      "    \"type\": \"kafka\",\n",
-      "    \"spec\": {\n",
-      "        \"ioConfig\": {\n",
-      "            \"type\": \"kafka\",\n",
-      "            \"consumerProperties\": {\n",
-      "                \"bootstrap.servers\": \"kafka:9092\"\n",
-      "            },\n",
-      "            \"topic\": \"social_media\",\n",
-      "            \"inputFormat\": {\n",
-      "                \"type\": \"json\"\n",
-      "            },\n",
-      "            \"useEarliestOffset\": true\n",
-      "        },\n",
-      "        \"tuningConfig\": {\n",
-      "            \"type\": \"kafka\"\n",
-      "        },\n",
-      "        \"dataSchema\": {\n",
-      "            \"dataSource\": \"social_media\",\n",
-      "            \"timestampSpec\": {\n",
-      "                \"column\": \"__time\",\n",
-      "                \"format\": \"iso\"\n",
-      "            },\n",
-      "            \"dimensionsSpec\": {\n",
-      "                \"dimensions\": [\n",
-      "                    \"username\",\n",
-      "                    \"post_title\",\n",
-      "                    {\n",
-      "                        \"type\": \"long\",\n",
-      "                        \"name\": \"views\"\n",
-      "                    },\n",
-      "                    {\n",
-      "                        \"type\": \"long\",\n",
-      "                        \"name\": \"upvotes\"\n",
-      "                    },\n",
-      "                    {\n",
-      "                        \"type\": \"long\",\n",
-      "                        \"name\": \"comments\"\n",
-      "                    },\n",
-      "                    \"edited\"\n",
-      "                ]\n",
-      "            },\n",
-      "            \"granularitySpec\": {\n",
-      "                \"queryGranularity\": \"none\",\n",
-      "                \"rollup\": false,\n",
-      "                \"segmentGranularity\": \"hour\"\n",
-      "            }\n",
-      "        }\n",
-      "    }\n",
-      "}\n"
-     ]
-    }
-   ],
-   "source": [
-    "print(json.dumps(json.loads(kafka_ingestion_spec), indent=4))"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "Send the spec to Druid to start the streaming ingestion from Kafka:"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 9,
-   "metadata": {},
-   "outputs": [
-    {
-     "data": {
-      "text/plain": [
-       "<Response [200]>"
-      ]
-     },
-     "execution_count": 9,
-     "metadata": {},
-     "output_type": "execute_result"
-    }
-   ],
-   "source": [
-    "headers = {\n",
-    "  'Content-Type': 'application/json'\n",
-    "}\n",
-    "\n",
-    "rest_client.post(\"/druid/indexer/v1/supervisor\", kafka_ingestion_spec, 
headers=headers)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "A `200` response indicates that the request was successful. You can view 
the running ingestion task and the new datasource in the web console at 
http://localhost:8888/unified-console.html.";
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Query Druid datasource and visualize query results"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "You can now query the new datasource called `social_media`. In this 
section, you also visualize query results using the Matplotlib and Seaborn 
visualization libraries. Run the following cell import these packages."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 10,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "import pandas as pd\n",
-    "import matplotlib\n",
-    "import matplotlib.pyplot as plt\n",
-    "import seaborn as sns"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "Run a simple query to view a subset of rows from the new datasource:"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 11,
-   "metadata": {},
-   "outputs": [
-    {
-     "data": {
-      "text/html": [
-       "<div class=\"druid\"><table>\n",
-       
"<tr><th>__time</th><th>username</th><th>post_title</th><th>views</th><th>upvotes</th><th>comments</th><th>edited</th></tr>\n",
-       "<tr><td>2023-01-03T16:40:21.501Z</td><td>willow</td><td>This title is 
required</td><td>15284</td><td>124</td><td>21</td><td>True</td></tr>\n",
-       
"<tr><td>2023-05-02T23:34:54.451Z</td><td>gus</td><td>3y4hkmd1!&#x27;Er4;</td><td>4031</td><td>93</td><td>15</td><td>False</td></tr>\n",
-       
"<tr><td>2023-05-02T23:34:55.454Z</td><td>mia</td><td>m62u53:D9s2bOvnY_VM9vjtZ&#x27;MyDLvQ7_xGodAP:ZNTXM6cFAt,_jrxBVBeRILLvAF9Z!jM9YNN;3ErV5eGbE_TFQS</td><td>16060</td><td>84</td><td>8</td><td>True</td></tr>\n",
-       
"<tr><td>2023-05-02T23:34:55.455Z</td><td>jojo</td><td>rAmeAJrjs;FBj:zy2MwoGh_P_SowlLTfp6zhX55xqogH.,1DC2xY_x2T;M_Vcu3QWaz650u;Roa</td><td>14313</td><td>65</td><td>7</td><td>False</td></tr>\n",
-       
"<tr><td>2023-05-02T23:34:56.456Z</td><td>willow</td><td>3bHB,iJdE;sedTDA,1dKGDAZL!qdsvO_tv.4Jrq7fa.KWcHPD&#x27;TB_5nnvsf9EgtnN8tGeeA0MjKc30iubJ:D&#x27;l7pHNihWpFz8K&#x27;46q!vJs</td><td>4237</td><td>112</td><td>3</td><td>True</td></tr>\n",
-       "</table></div>"
-      ],
-      "text/plain": [
-       "<IPython.core.display.HTML object>"
-      ]
-     },
-     "metadata": {},
-     "output_type": "display_data"
-    }
-   ],
-   "source": [
-    "sql = '''\n",
-    "SELECT * FROM social_media LIMIT 5\n",
-    "'''\n",
-    "display.sql(sql)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "In this social media scenario, each incoming event represents a post on 
social media, for which you collect the timestamp, username, and post metadata. 
You are interested in analyzing the total number of upvotes for all posts, 
compared between users. Preview this data with the following query:"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 12,
-   "metadata": {},
-   "outputs": [
-    {
-     "data": {
-      "text/html": [
-       "<div class=\"druid\"><table>\n",
-       "<tr><th>num_posts</th><th>total_upvotes</th><th>username</th></tr>\n",
-       "<tr><td>155</td><td>10985</td><td>willow</td></tr>\n",
-       "<tr><td>161</td><td>11223</td><td>gus</td></tr>\n",
-       "<tr><td>164</td><td>11456</td><td>leon</td></tr>\n",
-       "<tr><td>173</td><td>12098</td><td>jojo</td></tr>\n",
-       "<tr><td>176</td><td>12175</td><td>mia</td></tr>\n",
-       "<tr><td>177</td><td>11998</td><td>milton</td></tr>\n",
-       "<tr><td>185</td><td>13256</td><td>miette</td></tr>\n",
-       "<tr><td>188</td><td>13360</td><td>rocket</td></tr>\n",
-       "</table></div>"
-      ],
-      "text/plain": [
-       "<IPython.core.display.HTML object>"
-      ]
-     },
-     "metadata": {},
-     "output_type": "display_data"
-    }
-   ],
-   "source": [
-    "sql = '''\n",
-    "SELECT\n",
-    "  COUNT(post_title) as num_posts,\n",
-    "  SUM(upvotes) as total_upvotes,\n",
-    "  username\n",
-    "FROM social_media\n",
-    "GROUP BY username\n",
-    "ORDER BY num_posts\n",
-    "'''\n",
-    "\n",
-    "response = sql_client.sql_query(sql)\n",
-    "response.show()"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "Visualize the total number of upvotes per user using a line plot. You 
sort the results by username before plotting because the order of users may 
vary as new results arrive."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 13,
-   "metadata": {},
-   "outputs": [
-    {
-     "data": {
-      "image/png": 
"iVBORw0KGgoAAAANSUhEUgAAAk0AAAHMCAYAAADI/py4AAAAOXRFWHRTb2Z0d2FyZQBNYXRwbG90bGliIHZlcnNpb24zLjcuMCwgaHR0cHM6Ly9tYXRwbG90bGliLm9yZy88F64QAAAACXBIWXMAAA9hAAAPYQGoP6dpAACRN0lEQVR4nOzdd3iTZfcH8O+T7j3ppNAySwdtAQtF9p7KEEVl+JMhvjJERJYyVBRRnCjI60AcrwqyQaBskLJbSiktUArdmzZd6UjO7480sWE2Je2TpOdzXb20z/M0OUlpcnKf+z63QEQExhhjjDH2UBKxA2CMMcYYMwScNDHGGGOM1QEnTYwxxhhjdcBJE2OMMcZYHXDSxBhjjDFWB5w0McYYY4zVASdNjDHGGGN1YCp2AMZCoVAgIyMDdnZ2EARB7HAYY4wxVgdEhOLiYnh5eUEiefhYEidNOpKRkQEfHx+xw2CM
 [...]
-      "text/plain": [
-       "<Figure size 640x480 with 1 Axes>"
-      ]
-     },
-     "metadata": {},
-     "output_type": "display_data"
-    }
-   ],
-   "source": [
-    "df = pd.DataFrame(response.json)\n",
-    "df = df.sort_values('username')\n",
-    "\n",
-    "df.plot(x='username', y='total_upvotes', marker='o')\n",
-    "plt.xticks(rotation=45, ha='right')\n",
-    "plt.ylabel(\"Total number of upvotes\")\n",
-    "plt.gca().get_legend().remove()\n",
-    "plt.show()"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "The total number of upvotes likely depends on the total number of posts 
created per user. To better assess the relative impact per user, you compare 
the total number of upvotes (line plot) with the total number of posts."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 14,
-   "metadata": {},
-   "outputs": [
-    {
-     "data": {
-      "text/plain": [
-       "<matplotlib.legend.Legend at 0x7f5f18400310>"
-      ]
-     },
-     "execution_count": 14,
-     "metadata": {},
-     "output_type": "execute_result"
-    },
-    {
-     "data": {
-      "image/png": 
"iVBORw0KGgoAAAANSUhEUgAAA1cAAAHMCAYAAAA5/FJZAAAAOXRFWHRTb2Z0d2FyZQBNYXRwbG90bGliIHZlcnNpb24zLjcuMCwgaHR0cHM6Ly9tYXRwbG90bGliLm9yZy88F64QAAAACXBIWXMAAA9hAAAPYQGoP6dpAADE60lEQVR4nOzdd3hUdfb48feUTHrvgYSEFhJKQhGkCQSkKS6uFZDq6voVdDH2/UqxLLYFQWHhZ2ddXbEgX9dFpIgiSJESBQIBQiABUkmvk8zc3x9hBmICpExyJ8l5Pc88j5m5c++ZRJI5c87nfDSKoigIIYQQQgghhGgSrdoBCCGEEEIIIURbIMmVEEIIIYQQQtiAJFdCCCGEEEIIYQOSXAkhhBBCCCGEDUhyJYQQQgghhBA2IMmVEEIIIYQQQtiAJFdCCCGEEEIIYQN6tQNoK6qqqjh06BCBgYFotZKzCiGEEK2B
 [...]
-      "text/plain": [
-       "<Figure size 640x480 with 2 Axes>"
-      ]
-     },
-     "metadata": {},
-     "output_type": "display_data"
-    }
-   ],
-   "source": [
-    "matplotlib.rc_file_defaults()\n",
-    "ax1 = sns.set_style(style=None, rc=None )\n",
-    "\n",
-    "fig, ax1 = plt.subplots()\n",
-    "plt.xticks(rotation=45, ha='right')\n",
-    "\n",
-    "\n",
-    "sns.lineplot(\n",
-    "    data=df, x='username', y='total_upvotes',\n",
-    "    marker='o', ax=ax1, label=\"Sum of upvotes\")\n",
-    "ax1.get_legend().remove()\n",
-    "\n",
-    "ax2 = ax1.twinx()\n",
-    "sns.barplot(data=df, x='username', y='num_posts',\n",
-    "            order=df['username'], alpha=0.5, ax=ax2, log=True,\n",
-    "            color=\"orange\", label=\"Number of posts\")\n",
-    "\n",
-    "\n",
-    "# ask matplotlib for the plotted objects and their labels\n",
-    "lines, labels = ax1.get_legend_handles_labels()\n",
-    "lines2, labels2 = ax2.get_legend_handles_labels()\n",
-    "ax2.legend(lines + lines2, labels + labels2, bbox_to_anchor=(1.55, 1))"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "You should see a correlation between total number of upvotes and total 
number of posts. In order to track user impact on a more equal footing, 
normalize the total number of upvotes relative to the total number of posts, 
and plot the result:"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 15,
-   "metadata": {},
-   "outputs": [
-    {
-     "data": {
-      "image/png": 
"iVBORw0KGgoAAAANSUhEUgAAAkAAAAHMCAYAAAA9ABcIAAAAOXRFWHRTb2Z0d2FyZQBNYXRwbG90bGliIHZlcnNpb24zLjcuMCwgaHR0cHM6Ly9tYXRwbG90bGliLm9yZy88F64QAAAACXBIWXMAAA9hAAAPYQGoP6dpAACLeElEQVR4nO3dd1xT5/cH8E/YIFNkyhQRRFFQHDjqVlyto1ZbrVpXbbXO2or9ujocbW3VDm1t1VpbW2frqHvvCU5EBGSDiuxNcn5/8MstKaAEEi5Jzvv1yktzc3PvuQGSk+c5z/NIiIjAGGOMMaZD9MQOgDHGGGOsrnECxBhjjDGdwwkQY4wxxnQOJ0CMMcYY0zmcADHGGGNM53ACxBhjjDGdwwkQY4wxxnSOgdgB1EcymQzJycmwsLCARCIROxzGGGOMVQMRIScnB87OztDTe34bDydAlUhOToarq6vYYTDGGGOs
 [...]
-      "text/plain": [
-       "<Figure size 640x480 with 1 Axes>"
-      ]
-     },
-     "metadata": {},
-     "output_type": "display_data"
-    }
-   ],
-   "source": [
-    "df['upvotes_normalized'] = df['total_upvotes']/df['num_posts']\n",
-    "\n",
-    "df.plot(x='username', y='upvotes_normalized', marker='o', 
color='green')\n",
-    "plt.xticks(rotation=45, ha='right')\n",
-    "plt.ylabel(\"Number of upvotes (normalized)\")\n",
-    "plt.gca().get_legend().remove()\n",
-    "plt.show()"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "You've been working with data taken at a single snapshot in time from 
when you ran the last query. Run the same query again, and store the output in 
`response2`, which you will compare with the previous results:"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 16,
-   "metadata": {},
-   "outputs": [
-    {
-     "data": {
-      "text/html": [
-       "<div class=\"druid\"><table>\n",
-       "<tr><th>num_posts</th><th>total_upvotes</th><th>username</th></tr>\n",
-       "<tr><td>404</td><td>28166</td><td>willow</td></tr>\n",
-       "<tr><td>418</td><td>29413</td><td>jojo</td></tr>\n",
-       "<tr><td>419</td><td>29202</td><td>mia</td></tr>\n",
-       "<tr><td>419</td><td>29456</td><td>miette</td></tr>\n",
-       "<tr><td>428</td><td>29472</td><td>gus</td></tr>\n",
-       "<tr><td>433</td><td>30160</td><td>milton</td></tr>\n",
-       "<tr><td>440</td><td>31212</td><td>leon</td></tr>\n",
-       "<tr><td>443</td><td>31063</td><td>rocket</td></tr>\n",
-       "</table></div>"
-      ],
-      "text/plain": [
-       "<IPython.core.display.HTML object>"
-      ]
-     },
-     "metadata": {},
-     "output_type": "display_data"
-    }
-   ],
-   "source": [
-    "response2 = sql_client.sql_query(sql)\n",
-    "response2.show()"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "Normalizing the data also helps you evaluate trends over time more 
consistently on the same plot axes. Plot the normalized data again, this time 
alongside the results from the previous snapshot:"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": 17,
-   "metadata": {},
-   "outputs": [
-    {
-     "data": {
-      "image/png": 
"iVBORw0KGgoAAAANSUhEUgAAAkAAAAHMCAYAAAA9ABcIAAAAOXRFWHRTb2Z0d2FyZQBNYXRwbG90bGliIHZlcnNpb24zLjcuMCwgaHR0cHM6Ly9tYXRwbG90bGliLm9yZy88F64QAAAACXBIWXMAAA9hAAAPYQGoP6dpAAC6DklEQVR4nOzdd3iTZffA8W+SbroodNKWsilQoOxV9ihLkCUqAoKoiALixNetPxDfVwW3Iut9FZSpAlKWjLJX2WWVQgdtoYXuneT3R22ktkDTJk3Sns915ZI+ffI8J0ibk/s+97kVWq1WixBCCCFEDaI0dQBCCCGEEFVNEiAhhBBC1DiSAAkhhBCixpEESAghhBA1jiRAQgghhKhxJAESQgghRI0jCZAQQgghahwrUwdgjjQaDTdu3MDJyQmFQmHqcIQQQghRDlqtloyMDHx8fFAq7z/GIwlQGW7cuIGfn5+pwxBC
 [...]
-      "text/plain": [
-       "<Figure size 640x480 with 1 Axes>"
-      ]
-     },
-     "metadata": {},
-     "output_type": "display_data"
-    }
-   ],
-   "source": [
-    "df2 = pd.DataFrame(response2.json)\n",
-    "df2 = df2.sort_values('username')\n",
-    "df2['upvotes_normalized'] = df2['total_upvotes']/df2['num_posts']\n",
-    "\n",
-    "ax = df.plot(x='username', y='upvotes_normalized', marker='o', 
color='green', label=\"Time 1\")\n",
-    "df2.plot(x='username', y='upvotes_normalized', marker='o', 
color='purple', ax=ax, label=\"Time 2\")\n",
-    "plt.xticks(rotation=45, ha='right')\n",
-    "plt.ylabel(\"Number of upvotes (normalized)\")\n",
-    "plt.show()"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "This plot shows how some users maintain relatively consistent social 
media impact between the two query snapshots, whereas other users grow or 
decline in their influence.\n",
-    "\n",
-    "## Learn more\n",
-    "\n",
-    "This tutorial showed you how to create a Kafka topic using a Python 
client for Kafka, send a simulated stream of data to Kafka using a data 
generator, and query and visualize results over time. For more information, see 
the following resources:\n",
-    "\n",
-    "* [Apache Kafka 
ingestion](https://druid.apache.org/docs/latest/development/extensions-core/kafka-ingestion.html)\n",
-    "* [Querying 
data](https://druid.apache.org/docs/latest/tutorials/tutorial-query.html)\n",
-    "* [Tutorial: Run with 
Docker](https://druid.apache.org/docs/latest/tutorials/docker.html)"
-   ]
-  }
- ],
- "metadata": {
-  "kernelspec": {
-   "display_name": "Python 3 (ipykernel)",
-   "language": "python",
-   "name": "python3"
-  },
-  "language_info": {
-   "codemirror_mode": {
-    "name": "ipython",
-    "version": 3
-   },
-   "file_extension": ".py",
-   "mimetype": "text/x-python",
-   "name": "python",
-   "nbconvert_exporter": "python",
-   "pygments_lexer": "ipython3",
-   "version": "3.8.16"
-  },
-  "vscode": {
-   "interpreter": {
-    "hash": "a4289e5b8bae5973a6609d90f7bc464162478362b9a770893a3c5c597b0b36e7"
-   }
-  }
- },
- "nbformat": 4,
- "nbformat_minor": 4
-}
diff --git a/examples/quickstart/jupyter-notebooks/0-START-HERE.ipynb 
b/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/00-START-HERE.ipynb
similarity index 91%
rename from examples/quickstart/jupyter-notebooks/0-START-HERE.ipynb
rename to 
examples/quickstart/jupyter-notebooks/notebooks/01-introduction/00-START-HERE.ipynb
index 0fdbad297f..0f89633c22 100644
--- a/examples/quickstart/jupyter-notebooks/0-START-HERE.ipynb
+++ 
b/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/00-START-HERE.ipynb
@@ -66,7 +66,7 @@
     "topics and use a simple set of Python wrappers around the underlying REST 
API. The\n",
     "wrappers reside in the `druidapi` package within this directory. While 
the package\n",
     "can be used in any Python program, the key purpose, at present, is to 
support these\n",
-    "notebooks. See the [Introduction to the Druid Python 
API](Python_API_Tutorial.ipynb)\n",
+    "notebooks. See the [Introduction to the Druid Python 
API](01-druidapi-package-intro.ipynb)\n",
     "for an overview of the Python API."
    ]
   },
@@ -87,12 +87,12 @@
     "notebook directly, such as with `wget`, or manually through your web 
browser. Note\n",
     "that if you save the file from your web browser, make sure to remove the 
`.txt` extension.\n",
     "\n",
-    "- [Introduction to the Druid REST API](api-tutorial.ipynb) walks you 
through some of the\n",
+    "- [Introduction to the Druid REST 
API](../04-api/00-getting-started.ipynb) walks you through some of the\n",
     "  basics related to the Druid REST API and several endpoints.\n",
-    "- [Introduction to the Druid Python API](Python_API_Tutorial.ipynb) walks 
you through some of the\n",
+    "- [Introduction to the Druid Python API](01-druidapi-package-intro.ipynb) 
walks you through some of the\n",
     "  basics related to the Druid API using the Python wrapper API.\n",
-    "- [Learn the basics of Druid SQL](sql-tutorial.ipynb) introduces you to 
the unique aspects of Druid SQL with the primary focus on the SELECT statement. 
\n",
-    "- [Ingest and query data from Apache Kafka](kafka-tutorial.ipynb) walks 
you through ingesting an event stream from Kafka."
+    "- [Learn the basics of Druid 
SQL](../03-query/00-using-sql-with-druidapi.ipynb) introduces you to the unique 
aspects of Druid SQL with the primary focus on the SELECT statement. \n",
+    "- [Ingest and query data from Apache 
Kafka](../02-ingestion/01-streaming-from-kafka.ipynb) walks you through 
ingesting an event stream from Kafka."
    ]
   },
   {
@@ -154,7 +154,7 @@
    "name": "python",
    "nbconvert_exporter": "python",
    "pygments_lexer": "ipython3",
-   "version": "3.8.16"
+   "version": "3.11.4"
   }
  },
  "nbformat": 4,
diff --git a/examples/quickstart/jupyter-notebooks/Python_API_Tutorial.ipynb 
b/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/01-druidapi-package-intro.ipynb
similarity index 99%
rename from examples/quickstart/jupyter-notebooks/Python_API_Tutorial.ipynb
rename to 
examples/quickstart/jupyter-notebooks/notebooks/01-introduction/01-druidapi-package-intro.ipynb
index 851f1896db..f3b104b14f 100644
--- a/examples/quickstart/jupyter-notebooks/Python_API_Tutorial.ipynb
+++ 
b/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/01-druidapi-package-intro.ipynb
@@ -217,16 +217,6 @@
     "The above shows the list of datasources by default. You'll get an empty 
result if you have no datasources yet."
    ]
   },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "id": "616770ce",
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "display.tables()"
-   ]
-  },
   {
    "cell_type": "markdown",
    "id": "7392e484",
@@ -491,7 +481,7 @@
    "source": [
     "## Datasource Client\n",
     "\n",
-    "The Datasource client lets you perform operations on datasource objects. 
The SQL layer allows you to get metadata and do queries. The datasource client 
works with the underlying segments. Explaining the full functionality is the 
topic of another notebook. For now, you can use the datasource client to clean 
up the datasource created above. The `True` argument asks for \"if exists\" 
semantics so you don't get an error if the datasource was alredy deleted."
+    "The Datasource client lets you perform operations on datasource objects. 
The SQL layer allows you to get metadata and do queries. The datasource client 
works with the underlying segments. Explaining the full functionality is the 
topic of another notebook. For now, you can use the datasource client to clean 
up the datasource created above. The `True` argument asks for \"if exists\" 
semantics so you don't get an error if the datasource was already deleted."
    ]
   },
   {
@@ -579,9 +569,7 @@
    "cell_type": "code",
    "execution_count": null,
    "id": "9e42dfbc",
-   "metadata": {
-    "scrolled": true
-   },
+   "metadata": {},
    "outputs": [],
    "source": [
     "rest_client.get_json('/status')"
@@ -741,7 +729,7 @@
    "name": "python",
    "nbconvert_exporter": "python",
    "pygments_lexer": "ipython3",
-   "version": "3.8.16"
+   "version": "3.11.4"
   }
  },
  "nbformat": 4,
diff --git 
a/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/01-streaming-from-kafka.ipynb
 
b/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/01-streaming-from-kafka.ipynb
new file mode 100644
index 0000000000..6a62dbbd19
--- /dev/null
+++ 
b/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/01-streaming-from-kafka.ipynb
@@ -0,0 +1,537 @@
+{
+ "cells": [
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Ingest and query data from Apache Kafka\n",
+    "\n",
+    "<!--\n",
+    "  ~ Licensed to the Apache Software Foundation (ASF) under one\n",
+    "  ~ or more contributor license agreements.  See the NOTICE file\n",
+    "  ~ distributed with this work for additional information\n",
+    "  ~ regarding copyright ownership.  The ASF licenses this file\n",
+    "  ~ to you under the Apache License, Version 2.0 (the\n",
+    "  ~ \"License\"); you may not use this file except in compliance\n",
+    "  ~ with the License.  You may obtain a copy of the License at\n",
+    "  ~\n",
+    "  ~   http://www.apache.org/licenses/LICENSE-2.0\n";,
+    "  ~\n",
+    "  ~ Unless required by applicable law or agreed to in writing,\n",
+    "  ~ software distributed under the License is distributed on an\n",
+    "  ~ \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+    "  ~ KIND, either express or implied.  See the License for the\n",
+    "  ~ specific language governing permissions and limitations\n",
+    "  ~ under the License.\n",
+    "  -->\n",
+    "\n",
+    "This tutorial introduces you to streaming ingestion in Apache Druid using 
the Apache Kafka event streaming platform.\n",
+    "Follow along to learn how to create and load data into a Kafka topic, 
start ingesting data from the topic into Druid, and query results over time. 
This tutorial assumes you have a basic understanding of Druid ingestion, 
querying, and API requests."
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## Table of contents\n",
+    "\n",
+    "* [Prerequisites](#Prerequisites)\n",
+    "* [Load Druid API client](#Load-Druid-API-client)\n",
+    "* [Create Kafka topic](#Create-Kafka-topic)\n",
+    "* [Load data into Kafka topic](#Load-data-into-Kafka-topic)\n",
+    "* [Start Druid ingestion](#Start-Druid-ingestion)\n",
+    "* [Query Druid datasource and visualize query 
results](#Query-Druid-datasource-and-visualize-query-results)\n",
+    "* [Learn more](#Learn-more)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## Prerequisites\n",
+    "\n",
+    "This tutorial works with Druid 25.0.0 or later.\n",
+    "\n",
+    "Launch this tutorial and all prerequisites using the `all-services` 
profile of the Docker Compose file for Jupyter-based Druid tutorials. For more 
information, see [Docker for Jupyter Notebook 
tutorials](https://druid.apache.org/docs/latest/tutorials/tutorial-jupyter-docker.html).\n",
+    "\n",
+    "If you do not use the Docker Compose environment, you need the 
following:\n",
+    "* A running Druid instance.\n",
+    "   * Update the `druid_host` variable to point to your Router endpoint. 
For example, `druid_host = \"http://localhost:8888\"`.\n";,
+    "   * Update the `rest_client` variable to point to your Coordinator 
endpoint. For example, `\"http://localhost:8081\"`.\n";,
+    "* A running Kafka cluster.\n",
+    "   * Update the Kafka bootstrap servers to point to your servers. For 
example, `bootstrap_servers=[\"localhost:9092\"]`.\n",
+    "* The following Python packages:\n",
+    "   * `druidapi`, a Python client for Apache Druid\n",
+    "   * `DruidDataDriver`, a data generator\n",
+    "   * `kafka`, a Python client for Apache Kafka\n",
+    "   * `pandas`, `matplotlib`, and `seaborn` for data visualization\n"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## Load Druid API client"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "To start the tutorial, run the following cell. It imports the required 
Python packages and defines a variable for the Druid client, and another for 
the SQL client used to run SQL commands."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import druidapi\n",
+    "import json\n",
+    "\n",
+    "# druid_host is the hostname and port for your Druid deployment. \n",
+    "# In the Docker Compose tutorial environment, this is the Router\n",
+    "# service running at \"http://router:8888\".\n";,
+    "# If you are not using the Docker Compose environment, edit the 
`druid_host`.\n",
+    "\n",
+    "druid_host = \"http://router:8888\"\n";,
+    "druid_host\n",
+    "\n",
+    "druid = druidapi.jupyter_client(druid_host)\n",
+    "display = druid.display\n",
+    "sql_client = druid.sql\n",
+    "\n",
+    "# Create a rest client for native JSON ingestion for streaming data\n",
+    "rest_client = druidapi.rest.DruidRestClient(\"http://coordinator:8081\";)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## Create Kafka topic"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "This notebook relies on the Python client for the Apache Kafka. Import 
the Kafka producer and consumer modules, then create a Kafka client. You use 
the Kafka producer to create and publish records to a new topic named 
`social_media`."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from kafka import KafkaProducer\n",
+    "from kafka import KafkaConsumer\n",
+    "\n",
+    "# Kafka runs on kafka:9092 in multi-container tutorial application\n",
+    "producer = KafkaProducer(bootstrap_servers='kafka:9092')\n",
+    "topic_name = \"social_media\""
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Create the `social_media` topic and send a sample event. The `send()` 
command returns a metadata descriptor for the record."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "event = {\n",
+    "    \"__time\": \"2023-01-03T16:40:21.501\",\n",
+    "    \"username\": \"willow\",\n",
+    "    \"post_title\": \"This title is required\",\n",
+    "    \"views\": 15284,\n",
+    "    \"upvotes\": 124,\n",
+    "    \"comments\": 21,\n",
+    "    \"edited\": \"True\"\n",
+    "}\n",
+    "\n",
+    "producer.send(topic_name, json.dumps(event).encode('utf-8'))"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "To verify that the Kafka topic stored the event, create a consumer client 
to read records from the Kafka cluster, and get the next (only) message:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "consumer = KafkaConsumer(topic_name, bootstrap_servers=['kafka:9092'], 
auto_offset_reset='earliest',\n",
+    "     enable_auto_commit=True)\n",
+    "\n",
+    "print(next(consumer).value.decode('utf-8'))"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## Load data into Kafka topic"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Instead of manually creating events to send to the Kafka topic, use a 
data generator to simulate a continuous data stream. This tutorial makes use of 
Druid Data Driver to simulate a continuous data stream into the `social_media` 
Kafka topic. To learn more about the Druid Data Driver, see the Druid Summit 
talk, [Generating Time centric Data for Apache 
Druid](https://www.youtube.com/watch?v=3zAOeLe3iAo).\n",
+    "\n",
+    "In this notebook, you use a background process to continuously load data 
into the Kafka topic.\n",
+    "This allows you to keep executing commands in this notebook while data is 
constantly being streamed into the topic."
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Run the following cells to load sample data into the `social_media` Kafka 
topic:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import multiprocessing as mp\n",
+    "from datetime import datetime\n",
+    "import DruidDataDriver"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "def run_driver():\n",
+    "    DruidDataDriver.simulate(\"kafka_docker_config.json\", None, None, 
\"REAL\", datetime.now())\n",
+    "        \n",
+    "mp.set_start_method('fork')\n",
+    "ps = mp.Process(target=run_driver)\n",
+    "ps.start()"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## Start Druid ingestion"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Now that you have a new Kafka topic and data being streamed into the 
topic, you ingest the data into Druid by submitting a Kafka ingestion spec.\n",
+    "The ingestion spec describes the following:\n",
+    "* where to source the data to ingest (in `spec > ioConfig`),\n",
+    "* the datasource to ingest data into (in `spec > dataSchema > 
dataSource`), and\n",
+    "* what the data looks like (in `spec > dataSchema > dimensionsSpec`).\n",
+    "\n",
+    "Other properties control how Druid aggregates and stores data. For more 
information, see the Druid documenation:\n",
+    "* [Apache Kafka 
ingestion](https://druid.apache.org/docs/latest/development/extensions-core/kafka-ingestion.html)\n",
+    "* [Ingestion spec 
reference](https://druid.apache.org/docs/latest/ingestion/ingestion-spec.html)\n",
+    "\n",
+    "Run the following cells to define and view the Kafka ingestion spec."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "kafka_ingestion_spec = \"{\\\"type\\\": \\\"kafka\\\",\\\"spec\\\": 
{\\\"ioConfig\\\": {\\\"type\\\": \\\"kafka\\\",\\\"consumerProperties\\\": 
{\\\"bootstrap.servers\\\": \\\"kafka:9092\\\"},\\\"topic\\\": 
\\\"social_media\\\",\\\"inputFormat\\\": {\\\"type\\\": 
\\\"json\\\"},\\\"useEarliestOffset\\\": true},\\\"tuningConfig\\\": 
{\\\"type\\\": \\\"kafka\\\"},\\\"dataSchema\\\": {\\\"dataSource\\\": 
\\\"social_media\\\",\\\"timestampSpec\\\": {\\\"column\\\": 
\\\"__time\\\",\\\"for [...]
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "print(json.dumps(json.loads(kafka_ingestion_spec), indent=4))"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Send the spec to Druid to start the streaming ingestion from Kafka:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "headers = {\n",
+    "  'Content-Type': 'application/json'\n",
+    "}\n",
+    "\n",
+    "rest_client.post(\"/druid/indexer/v1/supervisor\", kafka_ingestion_spec, 
headers=headers)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "A `200` response indicates that the request was successful. You can view 
the running ingestion task and the new datasource in the web console at 
http://localhost:8888/unified-console.html.";
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## Query Druid datasource and visualize query results"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "You can now query the new datasource called `social_media`. In this 
section, you also visualize query results using the Matplotlib and Seaborn 
visualization libraries. Run the following cell import these packages."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import pandas as pd\n",
+    "import matplotlib\n",
+    "import matplotlib.pyplot as plt\n",
+    "import seaborn as sns"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Run a simple query to view a subset of rows from the new datasource:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "sql = '''\n",
+    "SELECT * FROM social_media LIMIT 5\n",
+    "'''\n",
+    "display.sql(sql)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "In this social media scenario, each incoming event represents a post on 
social media, for which you collect the timestamp, username, and post metadata. 
You are interested in analyzing the total number of upvotes for all posts, 
compared between users. Preview this data with the following query:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "sql = '''\n",
+    "SELECT\n",
+    "  COUNT(post_title) as num_posts,\n",
+    "  SUM(upvotes) as total_upvotes,\n",
+    "  username\n",
+    "FROM social_media\n",
+    "GROUP BY username\n",
+    "ORDER BY num_posts\n",
+    "'''\n",
+    "\n",
+    "response = sql_client.sql_query(sql)\n",
+    "response.show()"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Visualize the total number of upvotes per user using a line plot. You 
sort the results by username before plotting because the order of users may 
vary as new results arrive."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "df = pd.DataFrame(response.json)\n",
+    "df = df.sort_values('username')\n",
+    "\n",
+    "df.plot(x='username', y='total_upvotes', marker='o')\n",
+    "plt.xticks(rotation=45, ha='right')\n",
+    "plt.ylabel(\"Total number of upvotes\")\n",
+    "plt.gca().get_legend().remove()\n",
+    "plt.show()"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "The total number of upvotes likely depends on the total number of posts 
created per user. To better assess the relative impact per user, you compare 
the total number of upvotes (line plot) with the total number of posts."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "matplotlib.rc_file_defaults()\n",
+    "ax1 = sns.set_style(style=None, rc=None )\n",
+    "\n",
+    "fig, ax1 = plt.subplots()\n",
+    "plt.xticks(rotation=45, ha='right')\n",
+    "\n",
+    "\n",
+    "sns.lineplot(\n",
+    "    data=df, x='username', y='total_upvotes',\n",
+    "    marker='o', ax=ax1, label=\"Sum of upvotes\")\n",
+    "ax1.get_legend().remove()\n",
+    "\n",
+    "ax2 = ax1.twinx()\n",
+    "sns.barplot(data=df, x='username', y='num_posts',\n",
+    "            order=df['username'], alpha=0.5, ax=ax2, log=True,\n",
+    "            color=\"orange\", label=\"Number of posts\")\n",
+    "\n",
+    "\n",
+    "# ask matplotlib for the plotted objects and their labels\n",
+    "lines, labels = ax1.get_legend_handles_labels()\n",
+    "lines2, labels2 = ax2.get_legend_handles_labels()\n",
+    "ax2.legend(lines + lines2, labels + labels2, bbox_to_anchor=(1.55, 1))"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "You should see a correlation between total number of upvotes and total 
number of posts. In order to track user impact on a more equal footing, 
normalize the total number of upvotes relative to the total number of posts, 
and plot the result:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "df['upvotes_normalized'] = df['total_upvotes']/df['num_posts']\n",
+    "\n",
+    "df.plot(x='username', y='upvotes_normalized', marker='o', 
color='green')\n",
+    "plt.xticks(rotation=45, ha='right')\n",
+    "plt.ylabel(\"Number of upvotes (normalized)\")\n",
+    "plt.gca().get_legend().remove()\n",
+    "plt.show()"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "You've been working with data taken at a single snapshot in time from 
when you ran the last query. Run the same query again, and store the output in 
`response2`, which you will compare with the previous results:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "response2 = sql_client.sql_query(sql)\n",
+    "response2.show()"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Normalizing the data also helps you evaluate trends over time more 
consistently on the same plot axes. Plot the normalized data again, this time 
alongside the results from the previous snapshot:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "df2 = pd.DataFrame(response2.json)\n",
+    "df2 = df2.sort_values('username')\n",
+    "df2['upvotes_normalized'] = df2['total_upvotes']/df2['num_posts']\n",
+    "\n",
+    "ax = df.plot(x='username', y='upvotes_normalized', marker='o', 
color='green', label=\"Time 1\")\n",
+    "df2.plot(x='username', y='upvotes_normalized', marker='o', 
color='purple', ax=ax, label=\"Time 2\")\n",
+    "plt.xticks(rotation=45, ha='right')\n",
+    "plt.ylabel(\"Number of upvotes (normalized)\")\n",
+    "plt.show()"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "This plot shows how some users maintain relatively consistent social 
media impact between the two query snapshots, whereas other users grow or 
decline in their influence.\n",
+    "\n",
+    "## Learn more\n",
+    "\n",
+    "This tutorial showed you how to create a Kafka topic using a Python 
client for Kafka, send a simulated stream of data to Kafka using a data 
generator, and query and visualize results over time. For more information, see 
the following resources:\n",
+    "\n",
+    "* [Apache Kafka 
ingestion](https://druid.apache.org/docs/latest/development/extensions-core/kafka-ingestion.html)\n",
+    "* [Querying 
data](https://druid.apache.org/docs/latest/tutorials/tutorial-query.html)\n",
+    "* [Tutorial: Run with 
Docker](https://druid.apache.org/docs/latest/tutorials/docker.html)"
+   ]
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": "Python 3 (ipykernel)",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 3
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython3",
+   "version": "3.11.4"
+  },
+  "vscode": {
+   "interpreter": {
+    "hash": "a4289e5b8bae5973a6609d90f7bc464162478362b9a770893a3c5c597b0b36e7"
+   }
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 4
+}
diff --git 
a/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/DruidDataDriver.py
 
b/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/DruidDataDriver.py
new file mode 100644
index 0000000000..5acd25210b
--- /dev/null
+++ 
b/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/DruidDataDriver.py
@@ -0,0 +1,1133 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# DruidDataDriver - generates JSON records as a workload for Apache Druid.
+#
+
+import argparse
+import dateutil.parser
+from datetime import datetime, timedelta
+import json
+import numpy as np
+import random
+import re
+from sortedcontainers import SortedList
+import string
+import sys
+import threading
+import time
+
+############################################################################
+#
+# DruidDataDriver simulates Druid workloads by producing JSON records.
+# Use a JSON config file to describe the characteristics of the workload
+# you want to simulate.
+#
+# Run the program as follows:
+# python DruidDataDriver.py <config file name> <options>
+# Options include:
+# -n <total number of records to generate>
+# -t <duration for generating records>
+#
+# See the associated documentation for the format of the config file.
+#
+############################################################################
+
+
+class FutureEvent:
+    def __init__(self, t):
+        self.t = t
+        self.name = threading.current_thread().name
+        self.event = threading.Event()
+    def get_time(self):
+        return self.t
+    def get_name(self):
+        return self.name
+    def __lt__(self, other):
+        return self.t < other.t
+    def __eq__(self, other):
+        return self.t == other.t
+    def __le__(self, other):
+        return self.t <= other.t
+    def __gt__(self, other):
+        return self.t > other.t
+    def __ge__(self, other):
+        return self.t >= other.t
+
+    def __str__(self):
+        return 'FutureEvent('+self.name+', '+str(self.t)+')'
+    def pause(self):
+        #print(self.name+" pausing")
+        self.event.clear()
+        self.event.wait()
+    def resume(self):
+        #print(self.name+" resuming")
+        self.event.set()
+
+class Clock:
+    future_events = SortedList()
+    active_threads = 0
+    lock = threading.Lock()
+    sleep_lock = threading.Lock()
+
+    def __init__(self, time_type, start_time = datetime.now()):
+        self.sim_time = start_time
+        self.time_type = time_type
+
+    def __str__(self):
+        s = 'Clock(time='+str(self.sim_time)
+        for e in self.future_events:
+            s += ', '+str(e)
+        s += ')'
+        return s
+
+    def activate_thread(self):
+        if self.time_type == 'SIM':
+            self.lock.acquire()
+            self.active_threads += 1
+            self.lock.release()
+
+    def deactivate_thread(self):
+        if self.time_type == 'SIM':
+            self.lock.acquire()
+            self.active_threads -= 1
+            self.lock.release()
+
+    def end_thread(self):
+        if self.time_type == 'SIM':
+            self.lock.acquire()
+            self.active_threads -= 1
+            if len(self.future_events) > 0:
+                self.remove_event().resume()
+            self.lock.release()
+
+    def release_all(self):
+        if self.time_type == 'SIM':
+            self.lock.acquire()
+            #print('release_all - active_threads = '+str(self.active_threads))
+            for e in self.future_events:
+                e.resume()
+            self.lock.release()
+
+    def add_event(self, future_t):
+        this_event = FutureEvent(future_t)
+        self.future_events.add(this_event)
+        #print('add_event (after) '+threading.current_thread().name+' - 
'+str(self))
+        return this_event
+
+    def remove_event(self):
+        #print('remove_event (before) '+threading.current_thread().name+' - 
'+str(self))
+        next_event = self.future_events[0]
+        self.future_events.remove(next_event)
+        return next_event
+
+    def pause(self, event):
+        self.active_threads -= 1
+        self.lock.release()
+        event.pause()
+        self.lock.acquire()
+        self.active_threads += 1
+
+    def resume(self, event):
+        event.resume()
+
+    def now(self):
+        if self.time_type == 'SIM':
+            t = self.sim_time
+        else:
+            t = datetime.now()
+        return t
+
+    def sleep(self, delta):
+        if self.time_type == 'SIM': # Simulated time
+            self.lock.acquire()
+            #print(threading.current_thread().name+" begin sleep 
"+str(self.sim_time)+" + "+str(delta))
+            this_event = self.add_event(self.sim_time + 
timedelta(seconds=delta))
+            #print(threading.current_thread().name+" active threads 
"+str(self.active_threads))
+            if self.active_threads == 1:
+                next_event = self.remove_event()
+                if str(this_event) != str(next_event):
+                    self.resume(next_event)
+                    #print(threading.current_thread().name+" start pause if")
+                    self.pause(this_event)
+                    #print(threading.current_thread().name+" end pause if")
+            else:
+                #print(threading.current_thread().name+" start pause else")
+                self.pause(this_event)
+                #print(threading.current_thread().name+" end pause else")
+            self.sim_time = this_event.get_time()
+            #print(threading.current_thread().name+" end sleep 
"+str(self.sim_time))
+            self.lock.release()
+
+        else: # Real time
+            time.sleep(delta)
+
+
+#
+# Set up the target
+#
+
+class PrintStdout:
+    lock = threading.Lock()
+    def print(self, record):
+        with self.lock:
+            print(str(record))
+            sys.stdout.flush()
+    def __str__(self):
+        return '#printStdout()'
+
+class PrintFile:
+    f = None
+    def __init__(self, file_name):
+        self.file_name = file_name
+        self.f = open(file_name, 'w')
+    def __del__(self):
+        if self.f != None:
+            self.f.close()
+    def __str__(self):
+        return 'PrintFile(file_name='+self.file_name+')'
+    def print(self, record):
+        self.f.write(str(record)+'\n')
+        self.f.flush()
+
+class PrintKafka:
+    producer = None
+    topic = None
+    def __init__(self, endpoint, topic, security_protocol, compression_type):
+        from kafka import KafkaProducer
+
+        #print('PrintKafka('+str(endpoint)+', '+str(topic)+', 
'+str(security_protocol)+', '+str(compression_type)+')')
+        self.endpoint = endpoint
+        self.producer = KafkaProducer(bootstrap_servers=endpoint, 
security_protocol=security_protocol, compression_type=compression_type, 
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+        self.topic = topic
+    def __str__(self):
+        return 'PrintKafka(endpoint='+self.endpoint+', topic='+self.topic+')'
+    def print(self, record):
+        self.producer.send(self.topic, json.loads(str(record)))
+
+class PrintConfluent:
+    producer = None
+    topic = None
+    username = None
+    password = None
+    def __init__(self, servers, topic, username, password):
+        from confluent_kafka import Producer
+
+        #print('PrintKafka('+str(endpoint)+', '+str(topic)+', 
'+str(security_protocol)+', '+str(compression_type)+')')
+        self.servers = servers
+        self.producer = Producer({
+            'bootstrap.servers': servers,
+            'sasl.mechanisms': 'PLAIN',
+            'security.protocol': 'SASL_SSL',
+            'sasl.username': username,
+            'sasl.password': password
+        })
+        self.topic = topic
+        self.username = username
+        self.password = password
+    def __str__(self):
+        return 'PrintConfluent(servers='+self.servers+', topic='+self.topic+', 
username='+self.username+', password='+self.password+')'
+    def print(self, record):
+        print('producing '+str(record))
+        self.producer.produce(topic=self.topic, value=str(record))
+        self.producer.flush()
+
+
+#
+# Handle distributions
+#
+
+class DistConstant:
+    def __init__(self, value):
+        self.value = value
+    def __str__(self):
+        return 'DistConstant(value='+str(self.value)+')'
+    def get_sample(self):
+        return self.value
+
+class DistUniform:
+    def __init__(self, min_value, max_value):
+        self.min_value = min_value
+        self.max_value = max_value
+    def __str__(self):
+        return 'DistUniform(min_value='+str(self.min_value)+', 
max_value='+str(self.max_value)+')'
+    def get_sample(self):
+        return np.random.uniform(self.min_value, self.max_value+1)
+
+class DistExponential:
+    def __init__(self, mean):
+        self.mean = mean
+    def __str__(self):
+        return 'DistExponential(mean='+str(self.mean)+')'
+    def get_sample(self):
+        return np.random.exponential(scale = self.mean)
+
+class DistNormal:
+    def __init__(self, mean, stddev):
+        self.mean = mean
+        self.stddev = stddev
+    def __str__(self):
+        return 'DistNormal(mean='+str(self.mean )+', 
stddev='+str(self.stddev)+')'
+    def get_sample(self):
+        return np.random.normal(self.mean, self.stddev)
+
+def parse_distribution(desc):
+    dist_type = desc['type'].lower()
+    dist_gen = None
+    if dist_type == 'constant':
+        value = desc['value']
+        dist_gen = DistConstant(value)
+    elif dist_type == 'uniform':
+        min_value = desc['min']
+        max_value = desc['max']
+        dist_gen = DistUniform(min_value, max_value)
+    elif dist_type == 'exponential':
+        mean = desc['mean']
+        dist_gen = DistExponential(mean)
+    elif dist_type == 'normal':
+        mean = desc['mean']
+        stddev = desc['stddev']
+        dist_gen = DistNormal(mean, stddev)
+    else:
+        print('Error: Unknown distribution "'+dist_type+'"')
+        exit()
+    return dist_gen
+
+def parse_timestamp_distribution(desc):
+    dist_type = desc['type'].lower()
+    dist_gen = None
+    if dist_type == 'constant':
+        value = dateutil.parser.isoparse(desc['value']).timestamp()
+        dist_gen = DistConstant(value)
+    elif dist_type == 'uniform':
+        min_value = dateutil.parser.isoparse(desc['min']).timestamp()
+        max_value = dateutil.parser.isoparse(desc['max']).timestamp()
+        dist_gen = DistUniform(min_value, max_value)
+    elif dist_type == 'exponential':
+        mean = dateutil.parser.isoparse(desc['mean']).timestamp()
+        dist_gen = DistExponential(mean)
+    elif dist_type == 'normal':
+        mean = desc[dateutil.parser.isoparse(desc['mean']).timestamp()]
+        stddev = desc['stddev']
+        dist_gen = DistNormal(mean, stddev)
+    else:
+        print('Error: Unknown distribution "'+dist_type+'"')
+        exit()
+    return dist_gen
+
+
+#
+# Set up the dimensions for the emitters (see below)
+# There is one element class for each dimension type. This code creates a list 
of
+# elements and then runs through the list to create a single record.
+# Notice that the get_json_field_string() method produces the JSON dimension
+# field object based on the dimension configuration.
+# The get_stochastic_value() method is like a private method used to get a 
random
+# idividual value.
+#
+
+class ElementNow: # The __time dimension
+    def __init__(self, global_clock):
+        self.global_clock = global_clock
+    def __str__(self):
+        return 'ElementNow()'
+    def get_json_field_string(self):
+        now = self.global_clock.now().isoformat()[:-3]
+        return '"__time":"'+now+'"'
+
+class ElementCounter: # The __time dimension
+    def __init__(self, desc):
+        self.name = desc['name']
+        if 'percent_nulls' in desc.keys():
+            self.percent_nulls = desc['percent_nulls'] / 100.0
+        else:
+            self.percent_nulls = 0.0
+        if 'percent_missing' in desc.keys():
+            self.percent_missing = desc['percent_missing'] / 100.0
+        else:
+            self.percent_missing = 0.0
+        if 'start' in desc.keys():
+            self.start = desc['start']
+        else:
+            self.start = 0
+        if 'increment' in desc.keys():
+            self.increment = desc['increment']
+        else:
+            self.increment = 1
+        self.value = self.start
+    def __str__(self):
+        s = 'ElementCounter(name='+self.name
+        if self.start != 0:
+            s += ', '+str(self.start)
+        if self.increment != 1:
+            s += ', '+str(self.increment)
+        s += ')'
+        return s
+
+    def get_stochastic_value(self):
+        v = self.value
+        self.value += self.increment
+        return v
+
+    def get_json_field_string(self):
+        if random.random() < self.percent_nulls:
+            s = '"'+self.name+'": null'
+        else:
+            s = '"'+self.name+'":"'+str(self.get_stochastic_value())+'"'
+        return s
+
+    def is_missing(self):
+        return random.random() < self.percent_missing
+
+
+class ElementEnum: # enumeration dimensions
+    def __init__(self, desc):
+        self.name = desc['name']
+        if 'percent_nulls' in desc.keys():
+            self.percent_nulls = desc['percent_nulls'] / 100.0
+        else:
+            self.percent_nulls = 0.0
+        if 'percent_missing' in desc.keys():
+            self.percent_missing = desc['percent_missing'] / 100.0
+        else:
+            self.percent_missing = 0.0
+        self.cardinality = desc['values']
+        if 'cardinality_distribution' not in desc.keys():
+            print('Element '+self.name+' specifies a cardinality without a 
cardinality distribution')
+            exit()
+        self.cardinality_distribution = 
parse_distribution(desc['cardinality_distribution'])
+
+    def __str__(self):
+        return 'ElementEnum(name='+self.name+', 
cardinality='+str(self.cardinality)+', 
cardinality_distribution='+str(self.cardinality_distribution)+')'
+
+    def get_stochastic_value(self):
+        index = int(self.cardinality_distribution.get_sample())
+        if index < 0:
+            index = 0
+        if index >= len(self.cardinality):
+            index = len(self.cardinality)-1
+        return self.cardinality[index]
+
+    def get_json_field_string(self):
+        if random.random() < self.percent_nulls:
+            s = '"'+self.name+'": null'
+        else:
+            s = '"'+self.name+'":"'+str(self.get_stochastic_value())+'"'
+        return s
+
+    def is_missing(self):
+        return random.random() < self.percent_missing
+
+class ElementVariable: # Variable dimensions
+    def __init__(self, desc):
+        self.name = desc['name']
+        self.variable_name = desc['variable']
+
+    def __str__(self):
+        return 'ElementVariable(name='+self.name+', 
value='+self.variable_name+')'
+
+    def get_json_field_string(self, variables): # NOTE: because of timing, 
this method has a different signature than the other elements
+        value = variables[self.variable_name]
+        return '"'+self.name+'":"'+str(value)+'"'
+
+# TODO: Refactor ElementBase and subclasses, and those element classes that 
don't inherit from ElementBase
+class ElementBase: # Base class for the remainder of the dimensions
+    def __init__(self, desc):
+        self.name = desc['name']
+        if 'percent_nulls' in desc.keys():
+            self.percent_nulls = desc['percent_nulls'] / 100.0
+        else:
+            self.percent_nulls = 0.0
+        if 'percent_missing' in desc.keys():
+            self.percent_missing = desc['percent_missing'] / 100.0
+        else:
+            self.percent_missing = 0.0
+
+        self.cardinality_setting = desc['cardinality']
+        self.cardinality_distribution = None
+
+        if self.cardinality_setting == 0:
+            self.cardinality = None
+
+        else:
+            self.cardinality = []
+            if 'cardinality_distribution' not in desc.keys():
+                print('Element '+self.name+' specifies a cardinality without a 
cardinality distribution')
+                exit()
+            self.cardinality_distribution = 
parse_distribution(desc['cardinality_distribution'])
+            self.init_cardinality()
+
+    def init_cardinality(self):
+        for i in range(self.cardinality_setting):
+            value = None
+            while True:
+                value = self.get_stochastic_value()
+                if value not in self.cardinality:
+                    break
+            self.cardinality.append(value)
+
+
+    def get_stochastic_value(self):
+        pass
+
+    def get_json_field_string(self):
+        if random.random() < self.percent_nulls:
+            s = '"'+self.name+'": null'
+        else:
+            if self.cardinality is None:
+                value = self.get_stochastic_value()
+            else:
+                index = int(self.cardinality_distribution.get_sample())
+                if index < 0:
+                    index = 0
+                if index >= len(self.cardinality):
+                    index = len(self.cardinality)-1
+                value = self.cardinality[index]
+            s = '"'+self.name+'":'+str(value)
+        return s
+
+    def is_missing(self):
+        return random.random() < self.percent_missing
+
+
+class ElementString(ElementBase):
+
+    def __init__(self, desc):
+        self.length_distribution = 
parse_distribution(desc['length_distribution'])
+        if 'chars' in desc:
+            self.chars = desc['chars']
+        else:
+            self.chars = string.printable
+        super().__init__(desc)
+
+    def __str__(self):
+        return 'ElementString(name='+self.name+', 
cardinality='+str(self.cardinality)+', 
cardinality_distribution='+str(self.cardinality_distribution)+', 
chars='+self.chars+')'
+
+    def get_stochastic_value(self):
+        length = int(self.length_distribution.get_sample())
+        return ''.join(random.choices(list(self.chars), k=length))
+
+    def get_json_field_string(self):
+        if random.random() < self.percent_nulls:
+            s = '"'+self.name+'": null'
+        else:
+            if self.cardinality is None:
+                value = self.get_stochastic_value()
+            else:
+                index = int(self.cardinality_distribution.get_sample())
+                if index < 0:
+                    index = 0
+                if index >= len(self.cardinality):
+                    index = len(self.cardinality)-1
+                value = self.cardinality[index]
+            s = '"'+self.name+'":"'+str(value)+'"'
+        return s
+
+class ElementInt(ElementBase):
+    def __init__(self, desc):
+        self.value_distribution = parse_distribution(desc['distribution'])
+        super().__init__(desc)
+
+    def __str__(self):
+        return 'ElementInt(name='+self.name+', 
value_distribution='+str(self.value_distribution)+', 
cardinality='+str(self.cardinality)+', 
cardinality_distribution='+str(self.cardinality_distribution)+')'
+
+    def get_stochastic_value(self):
+        return int(self.value_distribution.get_sample())
+
+class ElementFloat(ElementBase):
+    def __init__(self, desc):
+        self.value_distribution = parse_distribution(desc['distribution'])
+        if 'precision' in desc:
+            self.precision = desc['precision']
+        else:
+            self.precision = None
+        super().__init__(desc)
+
+    def __str__(self):
+        return 'ElementFloat(name='+self.name+', 
value_distribution='+str(self.value_distribution)+', 
cardinality='+str(self.cardinality)+', 
cardinality_distribution='+str(self.cardinality_distribution)+')'
+
+    def get_stochastic_value(self):
+        return float(self.value_distribution.get_sample())
+
+    def get_json_field_string(self):
+        if random.random() < self.percent_nulls:
+            s = '"'+self.name+'": null'
+        else:
+            if self.cardinality is None:
+                value = self.get_stochastic_value()
+            else:
+                index = int(self.cardinality_distribution.get_sample())
+                if index < 0:
+                    index = 0
+                if index >= len(self.cardinality):
+                    index = len(self.cardinality)-1
+                value = self.cardinality[index]
+            if self.precision is None:
+                s = '"'+self.name+'":'+str(value)
+            else:
+                format = '%.'+str(self.precision)+'f'
+                s = '"'+self.name+'":'+str(format%value)
+        return s
+
+class ElementTimestamp(ElementBase):
+    def __init__(self, desc):
+        super().__init__(desc)
+
+    def __str__(self):
+        return 'ElementTimestamp(name='+self.name+', 
value_distribution='+str(self.value_distribution)+', 
cardinality='+str(self.cardinality)+', 
cardinality_distribution='+str(self.cardinality_distribution)+')'
+
+    def get_stochastic_value(self):
+        return 
datetime.fromtimestamp(self.value_distribution.get_sample()).isoformat()[:-3]
+
+    def get_json_field_string(self):
+        if random.random() < self.percent_nulls:
+            s = '"'+self.name+'": null'
+        else:
+            if self.cardinality is None:
+                value = self.get_stochastic_value()
+            else:
+                index = int(self.cardinality_distribution.get_sample())
+                if index < 0:
+                    index = 0
+                if index >= len(self.cardinality):
+                    index = len(self.cardinality)-1
+                value = self.cardinality[index]
+            s = '"'+self.name+'":"'+str(value)+'"'
+        return s
+
+    def is_missing(self):
+        return random.random() < self.percent_missing
+
+class ElementIPAddress(ElementBase):
+    def __init__(self, desc):
+        self.value_distribution = parse_distribution(desc['distribution'])
+        super().__init__(desc)
+
+    def __str__(self):
+        return 'ElementIPAddress(name='+self.name+', 
value_distribution='+str(self.value_distribution)+', 
cardinality='+str(self.cardinality)+', 
cardinality_distribution='+str(self.cardinality_distribution)+')'
+
+    def get_stochastic_value(self):
+        value = int(self.value_distribution.get_sample())
+        return str((value & 0xFF000000) >> 24)+'.'+str((value & 0x00FF0000) >> 
16)+'.'+str((value & 0x0000FF00) >> 8)+'.'+str(value & 0x000000FF)
+
+    def get_json_field_string(self):
+        if random.random() < self.percent_nulls:
+            s = '"'+self.name+'": null'
+        else:
+            if self.cardinality is None:
+                value = self.get_stochastic_value()
+            else:
+                index = int(self.cardinality_distribution.get_sample())
+                if index < 0:
+                    index = 0
+                if index >= len(self.cardinality):
+                    index = len(self.cardinality)-1
+                value = self.cardinality[index]
+            s = '"'+self.name+'":"'+str(value)+'"'
+        return s
+
+class ElementObject():
+    def __init__(self, desc):
+        self.name = desc['name']
+        self.dimensions = get_variables(desc['dimensions'])
+        if 'percent_nulls' in desc.keys():
+            self.percent_nulls = desc['percent_nulls'] / 100.0
+        else:
+            self.percent_nulls = 0.0
+        if 'percent_missing' in desc.keys():
+            self.percent_missing = desc['percent_missing'] / 100.0
+        else:
+            self.percent_missing = 0.0
+        cardinality = desc['cardinality']
+        if cardinality == 0:
+            self.cardinality = None
+            self.cardinality_distribution = None
+        else:
+            self.cardinality = []
+            if 'cardinality_distribution' not in desc.keys():
+                print('Element '+self.name+' specifies a cardinality without a 
cardinality distribution')
+                exit()
+            self.cardinality_distribution = 
parse_distribution(desc['cardinality_distribution'])
+            for i in range(cardinality):
+                value = None
+                while True:
+                    value = self.get_instance()
+                    if value not in self.cardinality:
+                        break
+                self.cardinality.append(value)
+
+    def __str__(self):
+        s = 'ElementObject(name='+self.name+', dimensions=['
+        for e in self.dimensions:
+            s += ',' + str(e)
+        s += '])'
+        return s
+
+    def get_instance(self):
+        s = '"'+self.name+'": {'
+        for e in self.dimensions:
+            s += e.get_json_field_string() + ','
+        s = s[:-1] +  '}'
+        return s
+
+
+    def get_json_field_string(self):
+        if random.random() < self.percent_nulls:
+            s = '"'+self.name+'": null'
+        else:
+            if self.cardinality is None:
+                s = self.get_instance()
+            else:
+                index = int(self.cardinality_distribution.get_sample())
+                if index < 0:
+                    index = 0
+                if index >= len(self.cardinality):
+                    index = len(self.cardinality)-1
+                s = self.cardinality[index]
+        return s
+
+    def is_missing(self):
+        return random.random() < self.percent_missing
+
+class ElementList():
+    def __init__(self, desc):
+        self.name = desc['name']
+        self.elements = get_variables(desc['elements'])
+        self.length_distribution = 
parse_distribution(desc['length_distribution'])
+        self.selection_distribution = 
parse_distribution(desc['selection_distribution'])
+        if 'percent_nulls' in desc.keys():
+            self.percent_nulls = desc['percent_nulls'] / 100.0
+        else:
+            self.percent_nulls = 0.0
+        if 'percent_missing' in desc.keys():
+            self.percent_missing = desc['percent_missing'] / 100.0
+        else:
+            self.percent_missing = 0.0
+        cardinality = desc['cardinality']
+        if cardinality == 0:
+            self.cardinality = None
+            self.cardinality_distribution = None
+        else:
+            self.cardinality = []
+            if 'cardinality_distribution' not in desc.keys():
+                print('Element '+self.name+' specifies a cardinality without a 
cardinality distribution')
+                exit()
+            self.cardinality_distribution = 
parse_distribution(desc['cardinality_distribution'])
+            for i in range(cardinality):
+                value = None
+                while True:
+                    value = self.get_instance()
+                    if value not in self.cardinality:
+                        break
+                self.cardinality.append(value)
+
+    def __str__(self):
+        s = 'ElementObject(name='+self.name
+        s += ', length_distribution='+str(self.length_distribution)
+        s += ', selection_distribution='+str(self.selection_distribution)
+        s += ', elements=['
+        for e in self.elements:
+            s += ',' + str(e)
+        s += '])'
+        return s
+
+    def get_instance(self):
+        s = '"'+self.name+'": ['
+        length = int(self.length_distribution.get_sample())
+        for i in range(length):
+            index = int(self.selection_distribution.get_sample())
+            if index < 0:
+                index = 0
+            if index >= length:
+                index = length-1
+            s += re.sub('^.*?:', '', 
self.elements[index].get_json_field_string(), count=1) + ','
+        s = s[:-1] +  ']'
+        return s
+
+
+    def get_json_field_string(self):
+        if random.random() < self.percent_nulls:
+            s = '"'+self.name+'": null'
+        else:
+            if self.cardinality is None:
+                s = self.get_instance()
+            else:
+                index = int(self.cardinality_distribution.get_sample())
+                if index < 0:
+                    index = 0
+                if index >= len(self.cardinality):
+                    index = len(self.cardinality)-1
+                s = self.cardinality[index]
+        return s
+
+    def is_missing(self):
+        return random.random() < self.percent_missing
+
+
+def parse_element(desc):
+    if desc['type'].lower() == 'counter':
+        el = ElementCounter(desc)
+    elif desc['type'].lower() == 'enum':
+        el = ElementEnum(desc)
+    elif desc['type'].lower() == 'string':
+        el = ElementString(desc)
+    elif desc['type'].lower() == 'int':
+        el = ElementInt(desc)
+    elif desc['type'].lower() == 'float':
+        el = ElementFloat(desc)
+    elif desc['type'].lower() == 'timestamp':
+        el = ElementTimestamp(desc)
+    elif desc['type'].lower() == 'ipaddress':
+        el = ElementIPAddress(desc)
+    elif desc['type'].lower() == 'variable':
+        el = ElementVariable(desc)
+    elif desc['type'].lower() == 'object':
+        el = ElementObject(desc)
+    elif desc['type'].lower() == 'list':
+        el = ElementList(desc)
+    else:
+        print('Error: Unknown dimension type "'+desc['type']+'"')
+        exit()
+    return el
+
+
+def get_variables(desc):
+    elements = []
+    for element in desc:
+        el = parse_element(element)
+        elements.append(el)
+    return elements
+
+def get_dimensions(desc, global_clock):
+    elements = get_variables(desc)
+    elements.insert(0, ElementNow(global_clock))
+    return elements
+
+
+#
+# Set up the state machine
+#
+
+class Transition:
+    def __init__(self, next_state, probability):
+        self.next_state = next_state
+        self.probability = probability
+
+    def __str__(self):
+        return 'Transition(next_state='+str(self.next_state)+', 
probability='+str(self.probability)+')'
+
+def parse_transitions(desc):
+    transitions = []
+    for trans in desc:
+        next_state = trans['next']
+        probability = float(trans['probability'])
+        transitions.append(Transition(next_state, probability))
+    return transitions
+
+class State:
+    def __init__(self, name, dimensions, delay, transitions, variables):
+        self.name = name
+        self.dimensions = dimensions
+        self.delay = delay
+        self.transistion_states = [t.next_state for t in transitions]
+        self.transistion_probabilities = [t.probability for t in transitions]
+        self.variables = variables
+
+    def __str__(self):
+        return 'State(name='+self.name+', dimensions='+str([str(d) for d in 
self.dimensions])+', delay='+str(self.delay)+', 
transistion_states='+str(self.transistion_states)+', 
transistion_probabilities='+str(self.transistion_probabilities)+'variables='+str([str(v)
 for v in self.variables])+')'
+
+    def get_next_state_name(self):
+        return random.choices(self.transistion_states, 
weights=self.transistion_probabilities, k=1)[0]
+
+class SimEnd:
+    lock = threading.Lock()
+    thread_end_event = threading.Event()
+    def __init__(self, total_recs, runtime, global_clock):
+        self.total_recs = total_recs
+        self.record_count = 0
+        self.global_clock = global_clock
+        if runtime is None:
+            self.t = None
+        else:
+            if runtime[-1].lower() == 's':
+                self.t = int(runtime[:-1])
+            elif runtime[-1].lower() == 'm':
+                self.t = int(runtime[:-1]) * 60
+            elif runtime[-1].lower() == 'h':
+                self.t = int(runtime[:-1]) * 60 * 60
+            else:
+                print('Error: Unknown runtime value"'+runtime+'"')
+                exit()
+
+    def inc_rec_count(self):
+        self.lock.acquire()
+        self.record_count += 1
+        self.lock.release()
+        if (self.total_recs is not None) and (self.record_count >= 
self.total_recs):
+            self.thread_end_event.set()
+
+    def is_done(self):
+        return ((self.total_recs is not None) and (self.record_count >= 
self.total_recs)) or ((self.t is not None) and self.thread_end_event.is_set())
+
+    def wait_for_end(self):
+        if self.t is not None:
+            self.global_clock.activate_thread()
+            self.global_clock.sleep(self.t)
+            self.thread_end_event.set()
+            self.global_clock.deactivate_thread()
+        elif self.total_recs is not None:
+            self.thread_end_event.wait()
+            self.global_clock.release_all()
+        else:
+            while True:
+                time.sleep(60)
+
+
+#
+# Run the driver
+#
+
+def create_record(dimensions, variables):
+    json_string = '{'
+    for element in dimensions:
+        if isinstance(element, ElementVariable):
+            json_string += element.get_json_field_string(variables) + ','
+        else:
+            if isinstance(element, ElementNow) or not element.is_missing():
+                json_string += element.get_json_field_string() + ','
+    json_string = json_string[:-1] + '}'
+    return json_string
+
+def set_variable_values(variables, dimensions):
+    for d in dimensions:
+        variables[d.name] = d.get_stochastic_value()
+
+def worker_thread(target_printer, states, initial_state, sim_end, 
global_clock):
+    # Process the state machine using worker threads
+    #print('Thread '+threading.current_thread().name+' starting...')
+    global_clock.activate_thread()
+    current_state = initial_state
+    variables = {}
+    while True:
+        set_variable_values(variables, current_state.variables)
+        record = create_record(current_state.dimensions, variables)
+        target_printer.print(record)
+        sim_end.inc_rec_count()
+        if sim_end.is_done():
+            break
+        delta = float(current_state.delay.get_sample())
+        global_clock.sleep(delta)
+        if sim_end.is_done():
+            break
+        next_state_name = current_state.get_next_state_name()
+        if next_state_name.lower() == 'stop':
+            break
+        current_state = states[next_state_name]
+
+    #print('Thread '+threading.current_thread().name+' done!')
+    global_clock.end_thread()
+
+def spawning_thread(target_printer, rate_delay, states, initial_state, 
sim_end, global_clock):
+    #print('Thread '+threading.current_thread().name+' starting...')
+    global_clock.activate_thread()
+    # Spawn the workers in a separate thread so we can stop the whole thing in 
the middle of spawning if necessary
+    count = 0
+    while not sim_end.is_done():
+        thread_name = 'W'+str(count)
+
+        count += 1
+        t = threading.Thread(target=worker_thread, args=(target_printer, 
states, initial_state, sim_end, global_clock, ), name=thread_name, daemon=True)
+        t.start()
+        global_clock.sleep(float(rate_delay.get_sample()))
+    global_clock.end_thread()
+    #print('Thread '+threading.current_thread().name+' done!')
+
+
+def simulate(config_file_name, runtime, total_recs, time_type, start_time):
+
+    if config_file_name:
+        with open(config_file_name, 'r') as f:
+            config = json.load(f)
+    else:
+        config = json.load(sys.stdin)
+
+    #
+    # Set up the gloabl clock
+    #
+
+    global_clock = Clock(time_type, start_time)
+    sim_end = SimEnd(total_recs, runtime, global_clock)
+
+
+    #
+    # Set up the output target
+    #
+
+    target = config['target']
+
+    if target['type'].lower() == 'stdout':
+        target_printer = PrintStdout()
+    elif target['type'].lower() == 'file':
+        path = target['path']
+        if path is None:
+            print('Error: File target requires a path item')
+            exit()
+        target_printer = PrintFile(path)
+    elif target['type'].lower() == 'kafka':
+        if 'endpoint' in target.keys():
+            endpoint = target['endpoint']
+        else:
+            print('Error: Kafka target requires an endpoint item')
+            exit()
+        if 'topic' in target.keys():
+            topic = target['topic']
+        else:
+            print('Error: Kafka target requires a topic item')
+            exit()
+        if 'security_protocol' in target.keys():
+            security_protocol = target['security_protocol']
+        else:
+            security_protocol = 'PLAINTEXT'
+        if 'compression_type' in target.keys():
+            compression_type = target['compression_type']
+        else:
+            compression_type = None
+        target_printer = PrintKafka(endpoint, topic, security_protocol, 
compression_type)
+    elif target['type'].lower() == 'confluent':
+        if 'servers' in target.keys():
+            servers = target['servers']
+        else:
+            print('Error: Conlfuent target requires a servers item')
+            exit()
+        if 'topic' in target.keys():
+            topic = target['topic']
+        else:
+            print('Error: Confluent target requires a topic item')
+            exit()
+        if 'username' in target.keys():
+            username = target['username']
+        else:
+            print('Error: Confluent target requires a username')
+            exit()
+        if 'password' in target.keys():
+            password = target['password']
+        else:
+            print('Error: Confluent target requires a password')
+            exit()
+        target_printer = PrintConfluent(servers, topic, username, password)
+    else:
+        print('Error: Unknown target type "'+target['type']+'"')
+        exit()
+
+    #sys.stderr.write('target='+str(target_printer)+'\n')
+
+
+    #
+    # Set up the interarrival rate
+    #
+
+    rate = config['interarrival']
+    rate_delay = parse_distribution(rate)
+
+    #sys.stderr.write('rate_delay='+str(rate_delay)+'\n')
+
+
+    #
+    # Set up emitters list
+    #
+
+    emitters = {}
+    for emitter in config['emitters']:
+        name = emitter['name']
+        dimensions = get_dimensions(emitter['dimensions'], global_clock)
+        emitters[name] = dimensions
+
+    #sys.stderr.write('emitters='+str(['(name='+str(key)+', 
dimensions='+str([str(e) for e in emitters[key]])+')' for key in 
emitters])+'\n')
+
+
+    #
+    # Set up the state machine
+    #
+
+    state_desc = config['states']
+    initial_state = None
+    states = {}
+    for state in state_desc:
+        name = state['name']
+        emitter_name = state['emitter']
+        if 'variables' not in state.keys():
+            variables = []
+        else:
+            variables = get_variables(state['variables'])
+        dimensions = emitters[emitter_name]
+        delay = parse_distribution(state['delay'])
+        transitions = parse_transitions(state['transitions'])
+        this_state = State(name, dimensions, delay, transitions, variables)
+        states[name] = this_state
+        if initial_state is None:
+            initial_state = this_state
+
+    #sys.stderr.write('states='+str(['('+str(key)+':'+str(states[key])+')' for 
key in states])+'\n')
+
+    #
+    # Finally, start the simulation
+    #
+
+    thrd = threading.Thread(target=spawning_thread, args=(target_printer, 
rate_delay, states, initial_state, sim_end, global_clock, ), name='Spawning', 
daemon=True)
+    thrd.start()
+    sim_end.wait_for_end()
+
+def main():
+
+    #
+    # Parse the command line
+    #
+
+    parser = argparse.ArgumentParser(description='Generates JSON records as a 
workload for Apache Druid.')
+    #parser.add_argument('config_file', metavar='<config file name>', 
help='the workload config file name')
+    parser.add_argument('-f', dest='config_file', nargs='?', help='the 
workload config file name')
+    parser.add_argument('-t', dest='time', nargs='?', help='the script runtime 
(may not be used with -n)')
+    parser.add_argument('-n', dest='n_recs', nargs='?', help='the number of 
records to generate (may not be used with -t)')
+    parser.add_argument('-s', dest='time_type', nargs='?', const='SIM', 
default='REAL', help='simulate time (default is real, not simulated)')
+
+    args = parser.parse_args()
+
+    config_file_name = args.config_file
+    runtime = args.time
+    total_recs = None
+    if args.n_recs is not None:
+        total_recs = int(args.n_recs)
+    time_type = args.time_type
+    if time_type == 'SIM':
+        start_time = datetime.now()
+    elif time_type == 'REAL':
+        start_time = datetime.now()
+    else:
+        start_time = dateutil.parser.isoparse(time_type)
+        time_type = 'SIM'
+
+
+    if (runtime is not None) and (total_recs is not None):
+        print("Use either -t or -n, but not both")
+        parser.print_help()
+        exit()
+
+
+    simulate(config_file_name, runtime, total_recs, time_type, start_time)
+
+
+
+if __name__ == "__main__":
+    main()
diff --git 
a/examples/quickstart/jupyter-notebooks/docker-jupyter/kafka_docker_config.json 
b/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/kafka_docker_config.json
similarity index 100%
rename from 
examples/quickstart/jupyter-notebooks/docker-jupyter/kafka_docker_config.json
rename to 
examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/kafka_docker_config.json
diff --git a/examples/quickstart/jupyter-notebooks/sql-tutorial.ipynb 
b/examples/quickstart/jupyter-notebooks/notebooks/03-query/00-using-sql-with-druidapi.ipynb
similarity index 99%
rename from examples/quickstart/jupyter-notebooks/sql-tutorial.ipynb
rename to 
examples/quickstart/jupyter-notebooks/notebooks/03-query/00-using-sql-with-druidapi.ipynb
index b0ce6238e9..4d9349d8f2 100644
--- a/examples/quickstart/jupyter-notebooks/sql-tutorial.ipynb
+++ 
b/examples/quickstart/jupyter-notebooks/notebooks/03-query/00-using-sql-with-druidapi.ipynb
@@ -661,7 +661,7 @@
    "name": "python",
    "nbconvert_exporter": "python",
    "pygments_lexer": "ipython3",
-   "version": "3.8.16"
+   "version": "3.11.4"
   },
   "toc-autonumbering": false,
   "toc-showcode": false,
diff --git a/examples/quickstart/jupyter-notebooks/api-tutorial.ipynb 
b/examples/quickstart/jupyter-notebooks/notebooks/04-api/00-getting-started.ipynb
similarity index 99%
rename from examples/quickstart/jupyter-notebooks/api-tutorial.ipynb
rename to 
examples/quickstart/jupyter-notebooks/notebooks/04-api/00-getting-started.ipynb
index b2616b5d8e..f2e920a51f 100644
--- a/examples/quickstart/jupyter-notebooks/api-tutorial.ipynb
+++ 
b/examples/quickstart/jupyter-notebooks/notebooks/04-api/00-getting-started.ipynb
@@ -681,7 +681,7 @@
    "name": "python",
    "nbconvert_exporter": "python",
    "pygments_lexer": "ipython3",
-   "version": "3.8.16"
+   "version": "3.11.4"
   },
   "vscode": {
    "interpreter": {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to