This is an automated email from the ASF dual-hosted git repository.

victoria pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 353f7bed7f Adding data generation pod to jupyter notebooks deployment 
(#14742)
353f7bed7f is described below

commit 353f7bed7fa3b56e2e67cafcbc7556b868be38cd
Author: Sergio Ferragut <[email protected]>
AuthorDate: Thu Aug 10 15:43:05 2023 -0700

    Adding data generation pod to jupyter notebooks deployment (#14742)
    
    Co-authored-by: Charles Smith <[email protected]>
    Co-authored-by: Victoria Lim <[email protected]>
---
 examples/quickstart/jupyter-notebooks/Dockerfile   |   14 +-
 .../docker-jupyter/docker-compose-local.yaml       |   10 +
 .../docker-jupyter/docker-compose.yaml             |   10 +
 .../jupyter-notebooks/docker-jupyter/environment   |    4 +-
 .../jupyter-notebooks/druidapi/druidapi/display.py |   34 +
 .../jupyter-notebooks/druidapi/druidapi/sql.py     |   11 +-
 .../jupyter-notebooks/druidapi/druidapi/tasks.py   |   17 +-
 .../notebooks/01-introduction/00-START-HERE.ipynb  |    3 +-
 .../01-druidapi-package-intro.ipynb                |    4 +-
 .../01-introduction/02-datagen-intro.ipynb         |  642 +++++++++++
 .../02-ingestion/01-streaming-from-kafka.ipynb     |  255 +++--
 .../notebooks/02-ingestion/DruidDataDriver.py      | 1133 --------------------
 .../02-ingestion/kafka_docker_config.json          |   90 --
 13 files changed, 886 insertions(+), 1341 deletions(-)

diff --git a/examples/quickstart/jupyter-notebooks/Dockerfile 
b/examples/quickstart/jupyter-notebooks/Dockerfile
index 52e10175c2..57970e2cc0 100644
--- a/examples/quickstart/jupyter-notebooks/Dockerfile
+++ b/examples/quickstart/jupyter-notebooks/Dockerfile
@@ -37,7 +37,8 @@ RUN pip install requests \
     pip install seaborn \
     pip install bokeh \
     pip install kafka-python \
-    pip install sortedcontainers 
+    pip install sortedcontainers \
+    pip install tqdm
 
 # Install druidapi client from apache/druid
 # Local install requires sudo privileges 
@@ -46,12 +47,6 @@ ADD druidapi /home/jovyan/druidapi
 WORKDIR /home/jovyan/druidapi
 RUN pip install .
 
-
-
-# WIP -- install DruidDataDriver as a package
-# Import data generator and configuration file
-# Change permissions to allow import (requires sudo privileges)
-
 # The Jupyter notebooks themselves are mounted into the image's 
/home/jovyan/notebooks
 # path when running this image.
 RUN mkdir -p /home/jovyan/notebooks
@@ -59,8 +54,3 @@ RUN mkdir -p /home/jovyan/notebooks
 WORKDIR /home/jovyan/notebooks
 USER jovyan
 
-
-
-# Add location of the data generator to PYTHONPATH
-ENV PYTHONPATH "${PYTHONPATH}:/home/jovyan/notebooks/02-ingestion"
-
diff --git 
a/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose-local.yaml
 
b/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose-local.yaml
index 2525fc485c..aecb24bdaf 100644
--- 
a/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose-local.yaml
+++ 
b/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose-local.yaml
@@ -27,6 +27,7 @@ volumes:
   coordinator_var: {}
   router_var: {}
   druid_shared: {}
+  datagen_data: {}
 
 
 services:
@@ -175,3 +176,12 @@ services:
       - "${JUPYTER_PORT:-8889}:8888"
     volumes:
       - ../notebooks:/home/jovyan/notebooks
+
+  datagen:
+    image: imply/datagen:latest
+    container_name: datagen
+    profiles: ["jupyter", "kafka-jupyter", "druid-jupyter", "all-services"]
+    ports:
+      - "${DATAGEN_PORT:-9999}:9999"
+    volumes:
+      - datagen_data:/files
diff --git 
a/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose.yaml 
b/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose.yaml
index b0b2d206e5..910646bf24 100644
--- a/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose.yaml
+++ b/examples/quickstart/jupyter-notebooks/docker-jupyter/docker-compose.yaml
@@ -27,6 +27,7 @@ volumes:
   coordinator_var: {}
   router_var: {}
   druid_shared: {}
+  datagen_data: {}
 
 
 services:
@@ -173,3 +174,12 @@ services:
       - "${JUPYTER_PORT:-8889}:8888"
     volumes:
       - ../notebooks:/home/jovyan/notebooks
+
+  datagen:
+    image: imply/datagen:latest
+    container_name: datagen
+    profiles: ["jupyter", "kafka-jupyter", "druid-jupyter", "all-services"]
+    ports:
+      - "${DATAGEN_PORT:-9999}:9999"
+    volumes:
+      - datagen_data:/files
diff --git a/examples/quickstart/jupyter-notebooks/docker-jupyter/environment 
b/examples/quickstart/jupyter-notebooks/docker-jupyter/environment
index c63a5c0e88..4b548f8d4b 100644
--- a/examples/quickstart/jupyter-notebooks/docker-jupyter/environment
+++ b/examples/quickstart/jupyter-notebooks/docker-jupyter/environment
@@ -39,8 +39,8 @@ druid_metadata_storage_connector_password=FoolishPassword
 
 druid_coordinator_balancer_strategy=cachingCost
 
-druid_indexer_runner_javaOptsArray=["-server", "-Xmx1g", "-Xms1g", 
"-XX:MaxDirectMemorySize=3g", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", 
"-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
-druid_indexer_fork_property_druid_processing_buffer_sizeBytes=256MiB
+druid_indexer_runner_javaOptsArray=["-server", "-Xmx256m", "-Xms256m", 
"-XX:MaxDirectMemorySize=324m", "-Duser.timezone=UTC", "-Dfile.encoding=UTF-8", 
"-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"]
+druid_indexer_fork_property_druid_processing_buffer_sizeBytes=64MiB
 
 
 
diff --git a/examples/quickstart/jupyter-notebooks/druidapi/druidapi/display.py 
b/examples/quickstart/jupyter-notebooks/druidapi/druidapi/display.py
index 5b4368325c..e51bff70ce 100644
--- a/examples/quickstart/jupyter-notebooks/druidapi/druidapi/display.py
+++ b/examples/quickstart/jupyter-notebooks/druidapi/druidapi/display.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from druidapi import consts
+import time
 
 class DisplayClient:
     '''
@@ -144,3 +145,36 @@ class DisplayClient:
 
     def tables(self, schema=consts.DRUID_SCHEMA):
         self._druid.sql._tables_query(schema).show(display=self)
+
+    def run_task(self, query):
+        '''
+        Run an MSQ task while displaying progress in the cell output.
+        :param query: INSERT/REPLACE statement to run
+        :return: None
+        '''
+        from tqdm import tqdm
+
+        task = self._druid.sql.task(query)
+        with tqdm(total=100.0) as pbar:
+            previous_progress = 0.0
+            while True:
+                reports=task.reports_no_wait()
+                # check if progress metric is available and display it
+                if 'multiStageQuery' in reports.keys():
+                    if 'payload' in reports['multiStageQuery'].keys():
+                        if 'counters' in 
reports['multiStageQuery']['payload'].keys():
+                            if ('0' in 
reports['multiStageQuery']['payload']['counters'].keys() ) and \
+                               ('0' in 
reports['multiStageQuery']['payload']['counters']['0'].keys()):
+                                if 'progressDigest' in 
reports['multiStageQuery']['payload']['counters']['0']['0']['sortProgress'].keys():
+                                    current_progress = 
reports['multiStageQuery']['payload']['counters']['0']['0']['sortProgress']['progressDigest']*100.0
+                                    pbar.update( current_progress - 
previous_progress ) # update requires a relative value
+                                    previous_progress = current_progress
+                        # present status if available
+                        if 'status' in 
reports['multiStageQuery']['payload'].keys():
+                            pbar.set_description(f"Loading data, 
status:[{reports['multiStageQuery']['payload']['status']['status']}]")
+                            # stop when job is done
+                            if 
reports['multiStageQuery']['payload']['status']['status'] in ['SUCCESS', 
'FAILED']:
+                                break;
+                else:
+                    pbar.set_description('Initializing...')
+                time.sleep(1)
diff --git a/examples/quickstart/jupyter-notebooks/druidapi/druidapi/sql.py 
b/examples/quickstart/jupyter-notebooks/druidapi/druidapi/sql.py
index f38d0a994a..46bad764da 100644
--- a/examples/quickstart/jupyter-notebooks/druidapi/druidapi/sql.py
+++ b/examples/quickstart/jupyter-notebooks/druidapi/druidapi/sql.py
@@ -585,6 +585,9 @@ class QueryTaskResult:
             self._reports = self._tasks().task_reports(self._id)
         return self._reports
 
+    def reports_no_wait(self) -> dict:
+        return self._tasks().task_reports(self._id, require_ok=False)
+
     @property
     def results(self):
         if not self._results:
@@ -844,7 +847,7 @@ class QueryClient:
         '''
         return self._function_args_query(table_name).rows
 
-    def wait_until_ready(self, table_name):
+    def wait_until_ready(self, table_name, verify_load_status=True):
         '''
         Waits for a datasource to be loaded in the cluster, and to become 
available to SQL.
 
@@ -852,8 +855,12 @@ class QueryClient:
         ----------
         table_name str
             The name of a datasource in the 'druid' schema.
+        verify_load_status
+            If true, checks whether all published segments are loaded before 
testing query.
+            If false, tries the test query before checking whether all 
published segments are loaded.
         '''
-        self.druid_client.datasources.wait_until_ready(table_name)
+        if verify_load_status:
+            self.druid_client.datasources.wait_until_ready(table_name)
         while True:
             try:
                 self.sql('SELECT 1 FROM "{}" LIMIT 1'.format(table_name));
diff --git a/examples/quickstart/jupyter-notebooks/druidapi/druidapi/tasks.py 
b/examples/quickstart/jupyter-notebooks/druidapi/druidapi/tasks.py
index 0c428eda12..b5652ba6ab 100644
--- a/examples/quickstart/jupyter-notebooks/druidapi/druidapi/tasks.py
+++ b/examples/quickstart/jupyter-notebooks/druidapi/druidapi/tasks.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 from druidapi.consts import OVERLORD_BASE
+import requests
 
 REQ_TASKS = OVERLORD_BASE + '/tasks'
 REQ_POST_TASK = OVERLORD_BASE + '/task'
@@ -112,7 +113,7 @@ class TaskClient:
         '''
         return self.client.get_json(REQ_TASK_STATUS, args=[task_id])
 
-    def task_reports(self, task_id) -> dict:
+    def task_reports(self, task_id, require_ok = True) -> dict:
         '''
         Retrieves the completion report for a completed task.
 
@@ -129,7 +130,19 @@ class TaskClient:
         ---------
         `GET /druid/indexer/v1/task/{taskId}/reports`
         '''
-        return self.client.get_json(REQ_TASK_REPORTS, args=[task_id])
+        if require_ok:
+            return self.client.get_json(REQ_TASK_REPORTS, args=[task_id])
+        else:
+            resp = self.client.get(REQ_TASK_REPORTS, args=[task_id], 
require_ok=require_ok)
+            if resp.status_code == requests.codes.ok:
+                try:
+                    result = resp.json()
+                except Exception as ex:
+                    result = {"message":"Payload could not be converted to 
json.", "payload":f"{resp.content}", "exception":f"{ex}"}
+                return result
+            else:
+                return {"message":f"Request return code:{resp.status_code}"}
+
 
     def submit_task(self, payload):
         '''
diff --git 
a/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/00-START-HERE.ipynb
 
b/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/00-START-HERE.ipynb
index 0f89633c22..d813dacae2 100644
--- 
a/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/00-START-HERE.ipynb
+++ 
b/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/00-START-HERE.ipynb
@@ -91,7 +91,8 @@
     "  basics related to the Druid REST API and several endpoints.\n",
     "- [Introduction to the Druid Python API](01-druidapi-package-intro.ipynb) 
walks you through some of the\n",
     "  basics related to the Druid API using the Python wrapper API.\n",
-    "- [Learn the basics of Druid 
SQL](../03-query/00-using-sql-with-druidapi.ipynb) introduces you to the unique 
aspects of Druid SQL with the primary focus on the SELECT statement. \n",
+    "- [Learn the basics of Druid 
SQL](../03-query/00-using-sql-with-druidapi.ipynb) introduces you to the unique 
aspects of Druid SQL with the primary focus on the SELECT statement.\n",
+    "- [Learn to use the Data Generator](./02-datagen-intro.ipynb) gets you 
started with streaming and batch file data generation for testing of any data 
schema.\n",
     "- [Ingest and query data from Apache 
Kafka](../02-ingestion/01-streaming-from-kafka.ipynb) walks you through 
ingesting an event stream from Kafka."
    ]
   },
diff --git 
a/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/01-druidapi-package-intro.ipynb
 
b/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/01-druidapi-package-intro.ipynb
index 6c943c4286..88b79fd8d9 100644
--- 
a/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/01-druidapi-package-intro.ipynb
+++ 
b/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/01-druidapi-package-intro.ipynb
@@ -445,7 +445,7 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "sql_client.run_task(sql)"
+    "display.run_task(sql)"
    ]
   },
   {
@@ -473,7 +473,7 @@
    "id": "11d9c95a",
    "metadata": {},
    "source": [
-    "`describe_table()` lists the columns in a table."
+    "`display.table(<table_name>)` lists the columns in a table."
    ]
   },
   {
diff --git 
a/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/02-datagen-intro.ipynb
 
b/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/02-datagen-intro.ipynb
new file mode 100644
index 0000000000..e3b3df2994
--- /dev/null
+++ 
b/examples/quickstart/jupyter-notebooks/notebooks/01-introduction/02-datagen-intro.ipynb
@@ -0,0 +1,642 @@
+{
+ "cells": [
+  {
+   "cell_type": "markdown",
+   "id": "9e07b3f5-d919-4179-91a1-0f6b66c42757",
+   "metadata": {},
+   "source": [
+    "# Data Generator Server\n",
+    "<!--\n",
+    "  ~ Licensed to the Apache Software Foundation (ASF) under one\n",
+    "  ~ or more contributor license agreements.  See the NOTICE file\n",
+    "  ~ distributed with this work for additional information\n",
+    "  ~ regarding copyright ownership.  The ASF licenses this file\n",
+    "  ~ to you under the Apache License, Version 2.0 (the\n",
+    "  ~ \"License\"); you may not use this file except in compliance\n",
+    "  ~ with the License.  You may obtain a copy of the License at\n",
+    "  ~\n",
+    "  ~   http://www.apache.org/licenses/LICENSE-2.0\n";,
+    "  ~\n",
+    "  ~ Unless required by applicable law or agreed to in writing,\n",
+    "  ~ software distributed under the License is distributed on an\n",
+    "  ~ \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+    "  ~ KIND, either express or implied.  See the License for the\n",
+    "  ~ specific language governing permissions and limitations\n",
+    "  ~ under the License.\n",
+    "  -->\n",
+    "The default Docker Compose deployment includes a data generation service 
created from the published Docker image at `imply/datagen:latest`. \n",
+    "This image is built by the project 
https://github.com/implydata/druid-datagenerator. \n",
+    "\n",
+    "This notebook shows you how to use the data generation service included 
in the Docker Compose deployment. It explains how to use predefined data 
generator configurations as well as how to build a custom data generator. You 
will also learn how to create sample data files for batch ingestion and how to 
generate live streaming data for streaming ingestion.\n",
+    "\n",
+    "## Table of contents\n",
+    "\n",
+    "* [Initialization](#Initialization)\n",
+    "* [List available configurations](#List-available-configurations)\n",
+    "* [Generate a data file for backfilling 
history](#Generate-a-data-file-for-backfilling-history)\n",
+    "* [Batch ingestion of generated 
files](#Batch-ingestion-of-generated-files)\n",
+    "* [Generate custom data](#Generate-custom-data)\n",
+    "* [Stream generated data](#Stream-generated-data)\n",
+    "* [Ingest data from a stream](#Ingest-data-from-a-stream)\n",
+    "* [Cleanup](#Cleanup)\n",
+    "\n",
+    "\n",
+    "## Initialization\n",
+    "\n",
+    "To interact with the data generation service, use the REST client 
provided in the [`druidapi` Python 
package](https://druid.apache.org/docs/latest/tutorials/tutorial-jupyter-index.html#python-api-for-druid)."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "f84766c7-c6a5-4496-91a3-abdb8ddd2375",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import druidapi\n",
+    "import os\n",
+    "import time\n",
+    "\n",
+    "# Datagen client \n",
+    "datagen = druidapi.rest.DruidRestClient(\"http://datagen:9999\";)\n",
+    "\n",
+    "if (os.environ['DRUID_HOST'] == None):\n",
+    "    druid_host=f\"http://router:8888\"\n";,
+    "else:\n",
+    "    druid_host=f\"http://{os.environ['DRUID_HOST']}:8888\"\n",
+    "\n",
+    "# Druid client\n",
+    "druid = druidapi.jupyter_client(druid_host)\n",
+    "\n",
+    "\n",
+    "\n",
+    "# these imports and constants are used by multiple cells\n",
+    "from datetime import datetime, timedelta\n",
+    "import json\n",
+    "\n",
+    "headers = {\n",
+    "  'Content-Type': 'application/json'\n",
+    "}"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "c54af617-0998-4010-90c3-9b5a38a09a5f",
+   "metadata": {},
+   "source": [
+    "### List available configurations\n",
+    "Use the `/list` API endpoint to get the data generator's available 
configuration values with predefined data generator schemas."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "1ba6a80a-c49b-4abf-943b-9dad82f2ae13",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "display(datagen.get(f\"/list\", require_ok=False).json())"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "ae88a3b7-60da-405d-bcf4-fb4affcfe973",
+   "metadata": {},
+   "source": [
+    "### Generate a data file for backfilling history\n",
+    "When generating a file for backfill purposes, you can select the start 
time and the duration of the simulation.\n",
+    "\n",
+    "Configure the data generator request as follows:\n",
+    "* `name`: an arbitrary name you assign to the job. Refer to the job name 
to get the job status or to stop the job.\n",
+    "* `target.type`: \"file\" to generate a data file\n",
+    "* `target.path`: identifies the name of the file to generate. The data 
generator ignores any path specified and creates the file in the current 
working directory.\n",
+    "* `time_type`,`time`: The data generator simulates the time range you 
specify with a start timestamp in the `time_type` property and a duration in 
the `time` property. To specify `time`, use the `h` suffix for hours, `m` for 
minutes, and `s` for seconds.\n",
+    "- `concurrency` indicates the maximum number of entities used 
concurrently to generate events. Each entity is a separate state machine that 
simulates things like user sessions, IoT devices, or other concurrent sources 
of event data.\n",
+    "\n",
+    "The following example uses the `clickstream.json` predefined 
configuration to generate data into a file called `clicks.json`. The data 
generator starts the sample data at one hour prior to the current time and 
simulates events for a duration of one hour. Since it is simulated, it does 
this in just a few seconds."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "811ff58f-75af-4092-a08d-5e07a51592ff",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# Configure the start time to one hour prior to the current time. \n",
+    "startDateTime = (datetime.now() - timedelta(hours = 
1)).strftime('%Y-%m-%dT%H:%M:%S.001')\n",
+    "print(f\"Starting to generate history at {startDateTime}.\")\n",
+    "\n",
+    "# Give the datagen job a name for use in subsequent API calls\n",
+    "job_name=\"gen_clickstream1\"\n",
+    "\n",
+    "# Generate a data file on the datagen server\n",
+    "datagen_request = {\n",
+    "    \"name\": job_name,\n",
+    "    \"target\": { \"type\": \"file\", \"path\":\"clicks.json\"},\n",
+    "    \"config_file\": \"clickstream/clickstream.json\", \n",
+    "    \"time_type\": startDateTime,\n",
+    "    \"time\": \"1h\",\n",
+    "    \"concurrency\":100\n",
+    "}\n",
+    "response = datagen.post(\"/start\", json.dumps(datagen_request), 
headers=headers, require_ok=False)\n",
+    "response.json()"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "d407d1d9-3f01-4128-a014-6a5f371c25a5",
+   "metadata": {},
+   "source": [
+    "#### Display jobs\n",
+    "Use the `/jobs` API endpoint to get the current jobs and job statuses."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "3de698c5-bcf4-40c7-b295-728fb54d1f0a",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "display(datagen.get(f\"/jobs\").json())"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "972ebed0-34a1-4ad2-909d-69b8b27c3046",
+   "metadata": {},
+   "source": [
+    "#### Get status of a job\n",
+    "Use the `/status/JOB_NAME` API endpoint to get the current jobs and their 
status."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "debce4f8-9c16-476c-9593-21ec984985d2",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "display(datagen.get(f\"/status/{job_name}\", require_ok=False).json())"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "ef818d78-6aa6-4d38-8a43-83416aede96f",
+   "metadata": {},
+   "source": [
+    "#### Stop a job\n",
+    "Use the `/stop/JOB_NAME` API endpoint to stop a job."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "7631b8b8-d3d6-4803-9162-587f440d2ef2",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "display(datagen.post(f\"/stop/{job_name}\", '').json())"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "0a8dc7d3-64e5-41e3-8c28-c5f19c0536f5",
+   "metadata": {},
+   "source": [
+    "#### List files created on datagen server\n",
+    "Use the `/files` API endpoint to list files available on the server."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "06ee36bd-2d2b-4904-9987-10636cf52aac",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "display(datagen.get(f\"/files\", '').json())"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "83ef9edb-98e2-45b4-88e8-578703faedc1",
+   "metadata": {},
+   "source": [
+    "### Batch ingestion of generated files\n",
+    "Use a [Druid HTTP input 
source](https://druid.apache.org/docs/latest/ingestion/native-batch-input-sources.html#http-input-source)
 in the [EXTERN 
function](https://druid.apache.org/docs/latest/multi-stage-query/reference.html#extern-function)
 of a [SQL-based 
ingestion](https://druid.apache.org/docs/latest/multi-stage-query/index.html) 
to load generated files.\n",
+    "You can access files by name from within Druid using the URI 
`http://datagen:9999/file/FILE_NAME`. Alternatively, if you run Druid outside 
of Docker but on the same machine, access the file with 
`http://localhost:9999/file/FILE_NAME`.\n";,
+    "The following example assumes that both Druid and the data generator 
server are running in Docker Compose."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "0d72b015-f8ec-4713-b6f2-fe7a15afff59",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "sql = '''\n",
+    "REPLACE INTO \"clicks\" OVERWRITE ALL\n",
+    "WITH \"ext\" AS (SELECT *\n",
+    "FROM TABLE(\n",
+    "  EXTERN(\n",
+    "    
'{\"type\":\"http\",\"uris\":[\"http://datagen:9999/file/clicks.json\"]}',\n",
+    "    '{\"type\":\"json\"}'\n",
+    "  )\n",
+    ") EXTEND (\"time\" VARCHAR, \"user_id\" VARCHAR, \"event_type\" VARCHAR, 
\"client_ip\" VARCHAR, \"client_device\" VARCHAR, \"client_lang\" VARCHAR, 
\"client_country\" VARCHAR, \"referrer\" VARCHAR, \"keyword\" VARCHAR, 
\"product\" VARCHAR))\n",
+    "SELECT\n",
+    "  TIME_PARSE(\"time\") AS \"__time\",\n",
+    "  \"user_id\",\n",
+    "  \"event_type\",\n",
+    "  \"client_ip\",\n",
+    "  \"client_device\",\n",
+    "  \"client_lang\",\n",
+    "  \"client_country\",\n",
+    "  \"referrer\",\n",
+    "  \"keyword\",\n",
+    "  \"product\"\n",
+    "FROM \"ext\"\n",
+    "PARTITIONED BY DAY\n",
+    "'''  \n",
+    "\n",
+    "druid.display.run_task(sql)\n",
+    "print(\"Waiting for segment avaialbility ...\")\n",
+    "druid.sql.wait_until_ready('clicks')\n",
+    "print(\"Data is available for query.\")"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "b0997b38-02c2-483e-bd15-439c4bf0097a",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "sql = '''\n",
+    "SELECT  \"event_type\", \"user_id\", count( DISTINCT \"client_ip\") 
ip_count\n",
+    "FROM \"clicks\"\n",
+    "GROUP BY 1,2\n",
+    "ORDER BY 3 DESC\n",
+    "LIMIT 10\n",
+    "'''\n",
+    "druid.display.sql(sql)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "66ec013f-28e4-4d5a-94a6-06e0ed537b4e",
+   "metadata": {},
+   "source": [
+    "## Generate custom data\n",
+    "\n",
+    "You can find the full set of configuration options for the data generator 
in the 
[README](https://github.com/implydata/druid-datagenerator#data-generator-configuration).\n",
+    "\n",
+    "This section demonstrates a simple custom configuration as an example. 
Notice that the emitter defined the schema as a list of dimensions, each 
dimension specifies how its values are generated: "
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "d6451310-b7dd-4b39-a23b-7b735b152d6c",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "gen_config = {\n",
+    "  \"emitters\": [\n",
+    "    {\n",
+    "      \"name\": \"simple_record\",\n",
+    "      \"dimensions\": [\n",
+    "        {\n",
+    "          \"type\": \"string\",\n",
+    "          \"name\": \"random_string_column\",\n",
+    "          \"length_distribution\": {\n",
+    "            \"type\": \"constant\",\n",
+    "            \"value\": 13\n",
+    "          },\n",
+    "          \"cardinality\": 0,\n",
+    "          \"chars\": \"#.abcdefghijklmnopqrstuvwxyz\"\n",
+    "        },\n",
+    "        {\n",
+    "          \"type\": \"int\",\n",
+    "          \"name\": \"distributed_number\",\n",
+    "          \"distribution\": {\n",
+    "            \"type\": \"uniform\",\n",
+    "            \"min\": 0,\n",
+    "            \"max\": 1000\n",
+    "          },\n",
+    "          \"cardinality\": 10,\n",
+    "          \"cardinality_distribution\": {\n",
+    "            \"type\": \"exponential\",\n",
+    "            \"mean\": 5\n",
+    "          }\n",
+    "        }\n",
+    "      ]\n",
+    "    }\n",
+    "  ],\n",
+    "  \"interarrival\": {\n",
+    "    \"type\": \"constant\",\n",
+    "    \"value\": 1\n",
+    "  },\n",
+    "  \"states\": [\n",
+    "    {\n",
+    "      \"name\": \"state_1\",\n",
+    "      \"emitter\": \"simple_record\",\n",
+    "      \"delay\": {\n",
+    "        \"type\": \"constant\",\n",
+    "        \"value\": 1\n",
+    "      },\n",
+    "      \"transitions\": [\n",
+    "        {\n",
+    "          \"next\": \"state_1\",\n",
+    "          \"probability\": 1.0\n",
+    "        }\n",
+    "      ]\n",
+    "    }\n",
+    "  ]\n",
+    "}\n",
+    "\n",
+    "target = { \"type\":\"file\", \"path\":\"sample_data.json\"}"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "89a22645-aea5-4c15-b81a-959b27df731f",
+   "metadata": {},
+   "source": [
+    "This example uses the `config` attribute of the request to configure a 
new custom data generator instead of using a  predefined `config_file`."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "e5e5c535-3474-42b4-9772-14279e712f3d",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# generate 1 hour of simulated time using custom configuration\n",
+    "datagen_request = {\n",
+    "    \"name\": \"sample_custom\",\n",
+    "    \"target\": target,\n",
+    "    \"config\": gen_config, \n",
+    "    \"time\": \"1h\",\n",
+    "    \"concurrency\":10,\n",
+    "    \"time_type\": \"SIM\"\n",
+    "}\n",
+    "response = datagen.post(\"/start\", json.dumps(datagen_request), 
headers=headers, require_ok=False)\n",
+    "response.json()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "952386f7-8181-4325-972b-5f30dc12cf21",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "display(datagen.get(f\"/jobs\", require_ok=False).json())"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "470b3a2a-4fd9-45a2-9221-497d906f62a9",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# display the first 1k characters of the generated data file\n",
+    "display( datagen.get(f\"/file/sample_data.json\").content[:1024])"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "350faea6-55b0-4386-830c-5160ae495012",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "datagen.post(f\"/stop/sample_custom\",'')"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "77bff054-0f16-4fd5-8ade-2d44b30d0cf2",
+   "metadata": {},
+   "source": [
+    "## Stream generated data\n",
+    "\n",
+    "The data generator works exactly the same whether it is writing data to a 
file or publishing messages into a stream. You  only need to change the target 
configuration.\n",
+    "\n",
+    "To use the Kafka container running on Docker Compose, use the host name 
`kafka:9092`. This tutorial uses the KAFKA_HOST environment variable from 
Docker Compose to specify the Kafka host. "
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "9959b7c3-6223-479d-b0c2-115a1c555090",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "if (os.environ['KAFKA_HOST'] == None):\n",
+    "    kafka_host=f\"kafka:9092\"\n",
+    "else:\n",
+    "    kafka_host=f\"{os.environ['KAFKA_HOST']}:9092\""
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "497abc18-6538-4536-a17f-fe10c4367611",
+   "metadata": {},
+   "source": [
+    "The simplest `target` object for Kafka and, similarly, Confluent is:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "686a74ab-e2dd-458e-9e93-10291064e9db",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "target = {\n",
+    "    \"type\":\"kafka\",\n",
+    "    \"endpoint\": kafka_host,\n",
+    "    \"topic\": \"custom_data\"\n",
+    "}\n",
+    "\n",
+    "# Generate 1 hour of real time using custom configuration, this means 
that this stream will run for an hour if not stopped\n",
+    "datagen_request = {\n",
+    "    \"name\": \"sample_custom\",\n",
+    "    \"target\": target,\n",
+    "    \"config\": gen_config, \n",
+    "    \"time\": \"1h\",\n",
+    "    \"concurrency\":10,\n",
+    "    \"time_type\": \"REAL\"\n",
+    "}\n",
+    "response = datagen.post(\"/start\", json.dumps(datagen_request), 
headers=headers, require_ok=False)\n",
+    "response.json()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "ec17d0c7-a3ab-4f37-bbf0-cc02bff44cf1",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "time.sleep(1) # avoid race condition of async job start\n",
+    "display(datagen.get(f\"/jobs\", require_ok=False).json())"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "84d7b706-9040-4a69-a956-1b1bbb037c32",
+   "metadata": {},
+   "source": [
+    "### Ingest data from a stream \n",
+    "This example shows how to start a streaming ingestion supervisor in 
Apache Druid to consume your custom data:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "51912409-e4e7-48d1-b3a5-b269622b4e56",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ingestion_spec ={\n",
+    "  \"type\": \"kafka\",\n",
+    "  \"spec\": {\n",
+    "    \"ioConfig\": {\n",
+    "      \"type\": \"kafka\",\n",
+    "      \"consumerProperties\": {\n",
+    "        \"bootstrap.servers\": \"kafka:9092\"\n",
+    "      },\n",
+    "      \"topic\": \"custom_data\",\n",
+    "      \"inputFormat\": {\n",
+    "        \"type\": \"json\"\n",
+    "      },\n",
+    "      \"useEarliestOffset\": True\n",
+    "    },\n",
+    "    \"tuningConfig\": {\n",
+    "      \"type\": \"kafka\",\n",
+    "      \"maxRowsInMemory\": 100000,\n",
+    "      \"resetOffsetAutomatically\": False\n",
+    "    },\n",
+    "    \"dataSchema\": {\n",
+    "      \"dataSource\": \"custom_data\",\n",
+    "      \"timestampSpec\": {\n",
+    "        \"column\": \"time\",\n",
+    "        \"format\": \"iso\"\n",
+    "      },\n",
+    "      \"dimensionsSpec\": {\n",
+    "        \"dimensions\": [\n",
+    "          \"random_string_column\",\n",
+    "          {\n",
+    "            \"type\": \"long\",\n",
+    "            \"name\": \"distributed_number\"\n",
+    "          }\n",
+    "        ]\n",
+    "      },\n",
+    "      \"granularitySpec\": {\n",
+    "        \"queryGranularity\": \"none\",\n",
+    "        \"rollup\": False,\n",
+    "        \"segmentGranularity\": \"hour\"\n",
+    "      }\n",
+    "    }\n",
+    "  }\n",
+    "}\n",
+    "\n",
+    "headers = {\n",
+    "  'Content-Type': 'application/json'\n",
+    "}\n",
+    "\n",
+    "druid.rest.post(\"/druid/indexer/v1/supervisor\", 
json.dumps(ingestion_spec), headers=headers)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "dddfb1cc-f863-4bf4-8c5a-b261b0b9c2f0",
+   "metadata": {},
+   "source": [
+    "Query the data on the stream, but first wait for its availability. It 
takes a bit of time for the streaming tasks to start, but once they are 
consuming you can see data very close to real time: Run the following cell 
multiple times to see how the data is changing:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "7e1284ed-5c49-4f37-81f7-c3b720473158",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "druid.sql.wait_until_ready('custom_data', verify_load_status=False)\n",
+    "druid.display.sql('''\n",
+    "SELECT SUM(distributed_number) sum_randoms, count(*) total_count\n",
+    "FROM custom_data\n",
+    "''')"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "4486e430-0776-46ad-8a8b-4f0354f17bfb",
+   "metadata": {},
+   "source": [
+    "### Cleanup\n",
+    "\n",
+    "Stop the streaming ingestion and the streaming producer:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "38943a92-dc23-41cf-91a4-1b68d2178033",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "print(f\"Stop streaming generator: 
[{datagen.post('/stop/sample_custom','',require_ok=False)}]\")\n",
+    "print(f'Reset offsets for streaming ingestion: 
[{druid.rest.post(\"/druid/indexer/v1/supervisor/custom_data/reset\",\"\", 
require_ok=False)}]')\n",
+    "print(f'Stop streaming ingestion: 
[{druid.rest.post(\"/druid/indexer/v1/supervisor/custom_data/terminate\",\"\", 
require_ok=False)}]')"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "0cf53bdc-de7f-425d-84b1-68d0cef420d8",
+   "metadata": {},
+   "source": [
+    "Wait for streaming ingestion to complete and then remove the custom data 
table:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "87341e7c-f7ab-488c-9913-091f712534cb",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "print(f\"Drop datasource: [{druid.datasources.drop('custom_data')}]\")"
+   ]
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": "Python 3 (ipykernel)",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 3
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython3",
+   "version": "3.11.4"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git 
a/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/01-streaming-from-kafka.ipynb
 
b/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/01-streaming-from-kafka.ipynb
index 6a62dbbd19..fc36b4b19a 100644
--- 
a/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/01-streaming-from-kafka.ipynb
+++ 
b/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/01-streaming-from-kafka.ipynb
@@ -4,7 +4,7 @@
    "cell_type": "markdown",
    "metadata": {},
    "source": [
-    "Ingest and query data from Apache Kafka\n",
+    "# Ingest and query data from Apache Kafka\n",
     "\n",
     "<!--\n",
     "  ~ Licensed to the Apache Software Foundation (ASF) under one\n",
@@ -60,9 +60,10 @@
     "   * Update the `rest_client` variable to point to your Coordinator 
endpoint. For example, `\"http://localhost:8081\"`.\n";,
     "* A running Kafka cluster.\n",
     "   * Update the Kafka bootstrap servers to point to your servers. For 
example, `bootstrap_servers=[\"localhost:9092\"]`.\n",
+    "* A running [Data Generator 
server](https://github.com/implydata/druid-datagenerator) accessible to the 
cluster.\n",
+    "   * Update the data generator client. For example `datagen = 
druidapi.rest.DruidRestClient(\"http://localhost:9999\";)`.\n",
     "* The following Python packages:\n",
     "   * `druidapi`, a Python client for Apache Druid\n",
-    "   * `DruidDataDriver`, a data generator\n",
     "   * `kafka`, a Python client for Apache Kafka\n",
     "   * `pandas`, `matplotlib`, and `seaborn` for data visualization\n"
    ]
@@ -88,36 +89,16 @@
    "outputs": [],
    "source": [
     "import druidapi\n",
-    "import json\n",
-    "\n",
-    "# druid_host is the hostname and port for your Druid deployment. \n",
-    "# In the Docker Compose tutorial environment, this is the Router\n",
-    "# service running at \"http://router:8888\".\n";,
-    "# If you are not using the Docker Compose environment, edit the 
`druid_host`.\n",
+    "import os\n",
+    "import time\n",
     "\n",
-    "druid_host = \"http://router:8888\"\n";,
-    "druid_host\n",
-    "\n",
-    "druid = druidapi.jupyter_client(druid_host)\n",
-    "display = druid.display\n",
-    "sql_client = druid.sql\n",
-    "\n",
-    "# Create a rest client for native JSON ingestion for streaming data\n",
-    "rest_client = druidapi.rest.DruidRestClient(\"http://coordinator:8081\";)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "## Create Kafka topic"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "This notebook relies on the Python client for the Apache Kafka. Import 
the Kafka producer and consumer modules, then create a Kafka client. You use 
the Kafka producer to create and publish records to a new topic named 
`social_media`."
+    "if 'DRUID_HOST' not in os.environ.keys():\n",
+    "    druid_host=f\"http://localhost:8888\"\n";,
+    "else:\n",
+    "    druid_host=f\"http://{os.environ['DRUID_HOST']}:8888\"\n",
+    "    \n",
+    "print(f\"Opening a connection to {druid_host}.\")\n",
+    "druid = druidapi.jupyter_client(druid_host)"
    ]
   },
   {
@@ -126,81 +107,82 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "from kafka import KafkaProducer\n",
-    "from kafka import KafkaConsumer\n",
+    "# Use kafka_host variable when connecting to kafka \n",
+    "if 'KAFKA_HOST' not in os.environ.keys():\n",
+    "   kafka_host=f\"http://localhost:9092\"\n";,
+    "else:\n",
+    "    kafka_host=f\"{os.environ['KAFKA_HOST']}:9092\"\n",
     "\n",
-    "# Kafka runs on kafka:9092 in multi-container tutorial application\n",
-    "producer = KafkaProducer(bootstrap_servers='kafka:9092')\n",
+    "# this is the kafka topic we will be working with:\n",
     "topic_name = \"social_media\""
    ]
   },
-  {
-   "cell_type": "markdown",
-   "metadata": {},
-   "source": [
-    "Create the `social_media` topic and send a sample event. The `send()` 
command returns a metadata descriptor for the record."
-   ]
-  },
   {
    "cell_type": "code",
    "execution_count": null,
    "metadata": {},
    "outputs": [],
    "source": [
-    "event = {\n",
-    "    \"__time\": \"2023-01-03T16:40:21.501\",\n",
-    "    \"username\": \"willow\",\n",
-    "    \"post_title\": \"This title is required\",\n",
-    "    \"views\": 15284,\n",
-    "    \"upvotes\": 124,\n",
-    "    \"comments\": 21,\n",
-    "    \"edited\": \"True\"\n",
-    "}\n",
+    "import json\n",
     "\n",
-    "producer.send(topic_name, json.dumps(event).encode('utf-8'))"
+    "# shortcuts for display and sql api's\n",
+    "display = druid.display\n",
+    "sql_client = druid.sql\n",
+    "\n",
+    "# client for Data Generator API\n",
+    "datagen = druidapi.rest.DruidRestClient(\"http://datagen:9999\";)\n",
+    "\n",
+    "# client for Druid API\n",
+    "rest_client = druid.rest"
    ]
   },
   {
    "cell_type": "markdown",
    "metadata": {},
    "source": [
-    "To verify that the Kafka topic stored the event, create a consumer client 
to read records from the Kafka cluster, and get the next (only) message:"
+    "## Publish generated data directly to Kafka topic"
    ]
   },
   {
-   "cell_type": "code",
-   "execution_count": null,
+   "cell_type": "markdown",
    "metadata": {},
-   "outputs": [],
    "source": [
-    "consumer = KafkaConsumer(topic_name, bootstrap_servers=['kafka:9092'], 
auto_offset_reset='earliest',\n",
-    "     enable_auto_commit=True)\n",
-    "\n",
-    "print(next(consumer).value.decode('utf-8'))"
+    "In this section, you use the data generator included as part of the 
Docker application to generate a stream of messages. The data generator creates 
and send messages to a Kafka topic named `social_media`. To learn more about 
the Druid Data Generator, see the 
[project](https://github.com/implydata/druid-datagenerator) and the [data 
generation notebook](../01-introduction/02-datagen-intro.ipynb)."
    ]
   },
   {
    "cell_type": "markdown",
    "metadata": {},
    "source": [
-    "## Load data into Kafka topic"
+    "### Generate data\n",
+    "Run the following cells to load sample data into the `social_media` Kafka 
topic. The data generator sends events until it reaches 50,000 messages."
    ]
   },
   {
-   "cell_type": "markdown",
+   "cell_type": "code",
+   "execution_count": null,
    "metadata": {},
+   "outputs": [],
    "source": [
-    "Instead of manually creating events to send to the Kafka topic, use a 
data generator to simulate a continuous data stream. This tutorial makes use of 
Druid Data Driver to simulate a continuous data stream into the `social_media` 
Kafka topic. To learn more about the Druid Data Driver, see the Druid Summit 
talk, [Generating Time centric Data for Apache 
Druid](https://www.youtube.com/watch?v=3zAOeLe3iAo).\n",
+    "headers = {\n",
+    "  'Content-Type': 'application/json'\n",
+    "}\n",
     "\n",
-    "In this notebook, you use a background process to continuously load data 
into the Kafka topic.\n",
-    "This allows you to keep executing commands in this notebook while data is 
constantly being streamed into the topic."
+    "datagen_request = {\n",
+    "    \"name\": \"social_stream\",\n",
+    "    \"target\": { \"type\": \"kafka\", \"endpoint\": kafka_host, 
\"topic\": topic_name  },\n",
+    "    \"config_file\": \"social/social_posts.json\", \n",
+    "    \"total_events\":50000,\n",
+    "    \"concurrency\":100\n",
+    "}\n",
+    "datagen.post(\"/start\", json.dumps(datagen_request), headers=headers)\n"
    ]
   },
   {
    "cell_type": "markdown",
    "metadata": {},
    "source": [
-    "Run the following cells to load sample data into the `social_media` Kafka 
topic:"
+    "Check the status of the job with the following cell:"
    ]
   },
   {
@@ -209,23 +191,9 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "import multiprocessing as mp\n",
-    "from datetime import datetime\n",
-    "import DruidDataDriver"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "def run_driver():\n",
-    "    DruidDataDriver.simulate(\"kafka_docker_config.json\", None, None, 
\"REAL\", datetime.now())\n",
-    "        \n",
-    "mp.set_start_method('fork')\n",
-    "ps = mp.Process(target=run_driver)\n",
-    "ps.start()"
+    "time.sleep(1) # avoid race between start of the job and its status being 
available\n",
+    "response = datagen.get('/status/social_stream')\n",
+    "response.json()"
    ]
   },
   {
@@ -258,16 +226,56 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "kafka_ingestion_spec = \"{\\\"type\\\": \\\"kafka\\\",\\\"spec\\\": 
{\\\"ioConfig\\\": {\\\"type\\\": \\\"kafka\\\",\\\"consumerProperties\\\": 
{\\\"bootstrap.servers\\\": \\\"kafka:9092\\\"},\\\"topic\\\": 
\\\"social_media\\\",\\\"inputFormat\\\": {\\\"type\\\": 
\\\"json\\\"},\\\"useEarliestOffset\\\": true},\\\"tuningConfig\\\": 
{\\\"type\\\": \\\"kafka\\\"},\\\"dataSchema\\\": {\\\"dataSource\\\": 
\\\"social_media\\\",\\\"timestampSpec\\\": {\\\"column\\\": 
\\\"__time\\\",\\\"for [...]
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "print(json.dumps(json.loads(kafka_ingestion_spec), indent=4))"
+    "kafka_ingestion_spec = {\n",
+    "    \"type\": \"kafka\",\n",
+    "    \"spec\": {\n",
+    "        \"ioConfig\": {\n",
+    "            \"type\": \"kafka\",\n",
+    "            \"consumerProperties\": {\n",
+    "                \"bootstrap.servers\": \"kafka:9092\"\n",
+    "            },\n",
+    "            \"topic\": \"social_media\",\n",
+    "            \"inputFormat\": {\n",
+    "                \"type\": \"json\"\n",
+    "            },\n",
+    "            \"useEarliestOffset\": True\n",
+    "        },\n",
+    "        \"tuningConfig\": {\n",
+    "            \"type\": \"kafka\"\n",
+    "        },\n",
+    "        \"dataSchema\": {\n",
+    "            \"dataSource\": \"social_media\",\n",
+    "            \"timestampSpec\": {\n",
+    "                \"column\": \"time\",\n",
+    "                \"format\": \"iso\"\n",
+    "            },\n",
+    "            \"dimensionsSpec\": {\n",
+    "                \"dimensions\": [\n",
+    "                    \"username\",\n",
+    "                    \"post_title\",\n",
+    "                    {\n",
+    "                        \"type\": \"long\",\n",
+    "                        \"name\": \"views\"\n",
+    "                    },\n",
+    "                    {\n",
+    "                        \"type\": \"long\",\n",
+    "                        \"name\": \"upvotes\"\n",
+    "                    },\n",
+    "                    {\n",
+    "                        \"type\": \"long\",\n",
+    "                        \"name\": \"comments\"\n",
+    "                    },\n",
+    "                    \"edited\"\n",
+    "                ]\n",
+    "            },\n",
+    "            \"granularitySpec\": {\n",
+    "                \"queryGranularity\": \"none\",\n",
+    "                \"rollup\": False,\n",
+    "                \"segmentGranularity\": \"hour\"\n",
+    "            }\n",
+    "        }\n",
+    "    }\n",
+    "}"
    ]
   },
   {
@@ -287,14 +295,26 @@
     "  'Content-Type': 'application/json'\n",
     "}\n",
     "\n",
-    "rest_client.post(\"/druid/indexer/v1/supervisor\", kafka_ingestion_spec, 
headers=headers)"
+    "supervisor = rest_client.post(\"/druid/indexer/v1/supervisor\", 
json.dumps(kafka_ingestion_spec), headers=headers)\n",
+    "print(supervisor.status_code)"
    ]
   },
   {
    "cell_type": "markdown",
    "metadata": {},
    "source": [
-    "A `200` response indicates that the request was successful. You can view 
the running ingestion task and the new datasource in the web console at 
http://localhost:8888/unified-console.html.";
+    "A `200` response indicates that the request was successful. You can view 
the running ingestion task and the new datasource in the web console's 
[ingestion view](http://localhost:8888/unified-console.html#ingestion).\n",
+    "\n",
+    "The following cell pauses further execution until the ingestion has 
started and the datasource is available for querying:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "druid.sql.wait_until_ready('social_media', verify_load_status=False)"
    ]
   },
   {
@@ -496,8 +516,49 @@
    "cell_type": "markdown",
    "metadata": {},
    "source": [
-    "This plot shows how some users maintain relatively consistent social 
media impact between the two query snapshots, whereas other users grow or 
decline in their influence.\n",
-    "\n",
+    "This plot shows how some users maintain relatively consistent social 
media impact between the two query snapshots, whereas other users grow or 
decline in their influence."
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "## Cleanup \n",
+    "The following cells stop the data generation and ingestion jobs and 
removes the datasource from Druid."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "print(f\"Stop streaming generator: 
[{datagen.post('/stop/social_stream','',require_ok=False)}]\")\n",
+    "print(f'Reset offsets for ingestion: 
[{druid.rest.post(\"/druid/indexer/v1/supervisor/social_media/reset\",\"\", 
require_ok=False)}]')\n",
+    "print(f'Stop streaming ingestion: 
[{druid.rest.post(\"/druid/indexer/v1/supervisor/social_media/terminate\",\"\", 
require_ok=False)}]')"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "Once the ingestion process ends and completes any final ingestion steps, 
remove the datasource with the following cell:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "time.sleep(5) # wait for streaming ingestion tasks to end\n",
+    "print(f\"Drop datasource: [{druid.datasources.drop('social_media')}]\")"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
     "## Learn more\n",
     "\n",
     "This tutorial showed you how to create a Kafka topic using a Python 
client for Kafka, send a simulated stream of data to Kafka using a data 
generator, and query and visualize results over time. For more information, see 
the following resources:\n",
diff --git 
a/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/DruidDataDriver.py
 
b/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/DruidDataDriver.py
deleted file mode 100644
index 5acd25210b..0000000000
--- 
a/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/DruidDataDriver.py
+++ /dev/null
@@ -1,1133 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# DruidDataDriver - generates JSON records as a workload for Apache Druid.
-#
-
-import argparse
-import dateutil.parser
-from datetime import datetime, timedelta
-import json
-import numpy as np
-import random
-import re
-from sortedcontainers import SortedList
-import string
-import sys
-import threading
-import time
-
-############################################################################
-#
-# DruidDataDriver simulates Druid workloads by producing JSON records.
-# Use a JSON config file to describe the characteristics of the workload
-# you want to simulate.
-#
-# Run the program as follows:
-# python DruidDataDriver.py <config file name> <options>
-# Options include:
-# -n <total number of records to generate>
-# -t <duration for generating records>
-#
-# See the associated documentation for the format of the config file.
-#
-############################################################################
-
-
-class FutureEvent:
-    def __init__(self, t):
-        self.t = t
-        self.name = threading.current_thread().name
-        self.event = threading.Event()
-    def get_time(self):
-        return self.t
-    def get_name(self):
-        return self.name
-    def __lt__(self, other):
-        return self.t < other.t
-    def __eq__(self, other):
-        return self.t == other.t
-    def __le__(self, other):
-        return self.t <= other.t
-    def __gt__(self, other):
-        return self.t > other.t
-    def __ge__(self, other):
-        return self.t >= other.t
-
-    def __str__(self):
-        return 'FutureEvent('+self.name+', '+str(self.t)+')'
-    def pause(self):
-        #print(self.name+" pausing")
-        self.event.clear()
-        self.event.wait()
-    def resume(self):
-        #print(self.name+" resuming")
-        self.event.set()
-
-class Clock:
-    future_events = SortedList()
-    active_threads = 0
-    lock = threading.Lock()
-    sleep_lock = threading.Lock()
-
-    def __init__(self, time_type, start_time = datetime.now()):
-        self.sim_time = start_time
-        self.time_type = time_type
-
-    def __str__(self):
-        s = 'Clock(time='+str(self.sim_time)
-        for e in self.future_events:
-            s += ', '+str(e)
-        s += ')'
-        return s
-
-    def activate_thread(self):
-        if self.time_type == 'SIM':
-            self.lock.acquire()
-            self.active_threads += 1
-            self.lock.release()
-
-    def deactivate_thread(self):
-        if self.time_type == 'SIM':
-            self.lock.acquire()
-            self.active_threads -= 1
-            self.lock.release()
-
-    def end_thread(self):
-        if self.time_type == 'SIM':
-            self.lock.acquire()
-            self.active_threads -= 1
-            if len(self.future_events) > 0:
-                self.remove_event().resume()
-            self.lock.release()
-
-    def release_all(self):
-        if self.time_type == 'SIM':
-            self.lock.acquire()
-            #print('release_all - active_threads = '+str(self.active_threads))
-            for e in self.future_events:
-                e.resume()
-            self.lock.release()
-
-    def add_event(self, future_t):
-        this_event = FutureEvent(future_t)
-        self.future_events.add(this_event)
-        #print('add_event (after) '+threading.current_thread().name+' - 
'+str(self))
-        return this_event
-
-    def remove_event(self):
-        #print('remove_event (before) '+threading.current_thread().name+' - 
'+str(self))
-        next_event = self.future_events[0]
-        self.future_events.remove(next_event)
-        return next_event
-
-    def pause(self, event):
-        self.active_threads -= 1
-        self.lock.release()
-        event.pause()
-        self.lock.acquire()
-        self.active_threads += 1
-
-    def resume(self, event):
-        event.resume()
-
-    def now(self):
-        if self.time_type == 'SIM':
-            t = self.sim_time
-        else:
-            t = datetime.now()
-        return t
-
-    def sleep(self, delta):
-        if self.time_type == 'SIM': # Simulated time
-            self.lock.acquire()
-            #print(threading.current_thread().name+" begin sleep 
"+str(self.sim_time)+" + "+str(delta))
-            this_event = self.add_event(self.sim_time + 
timedelta(seconds=delta))
-            #print(threading.current_thread().name+" active threads 
"+str(self.active_threads))
-            if self.active_threads == 1:
-                next_event = self.remove_event()
-                if str(this_event) != str(next_event):
-                    self.resume(next_event)
-                    #print(threading.current_thread().name+" start pause if")
-                    self.pause(this_event)
-                    #print(threading.current_thread().name+" end pause if")
-            else:
-                #print(threading.current_thread().name+" start pause else")
-                self.pause(this_event)
-                #print(threading.current_thread().name+" end pause else")
-            self.sim_time = this_event.get_time()
-            #print(threading.current_thread().name+" end sleep 
"+str(self.sim_time))
-            self.lock.release()
-
-        else: # Real time
-            time.sleep(delta)
-
-
-#
-# Set up the target
-#
-
-class PrintStdout:
-    lock = threading.Lock()
-    def print(self, record):
-        with self.lock:
-            print(str(record))
-            sys.stdout.flush()
-    def __str__(self):
-        return '#printStdout()'
-
-class PrintFile:
-    f = None
-    def __init__(self, file_name):
-        self.file_name = file_name
-        self.f = open(file_name, 'w')
-    def __del__(self):
-        if self.f != None:
-            self.f.close()
-    def __str__(self):
-        return 'PrintFile(file_name='+self.file_name+')'
-    def print(self, record):
-        self.f.write(str(record)+'\n')
-        self.f.flush()
-
-class PrintKafka:
-    producer = None
-    topic = None
-    def __init__(self, endpoint, topic, security_protocol, compression_type):
-        from kafka import KafkaProducer
-
-        #print('PrintKafka('+str(endpoint)+', '+str(topic)+', 
'+str(security_protocol)+', '+str(compression_type)+')')
-        self.endpoint = endpoint
-        self.producer = KafkaProducer(bootstrap_servers=endpoint, 
security_protocol=security_protocol, compression_type=compression_type, 
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
-        self.topic = topic
-    def __str__(self):
-        return 'PrintKafka(endpoint='+self.endpoint+', topic='+self.topic+')'
-    def print(self, record):
-        self.producer.send(self.topic, json.loads(str(record)))
-
-class PrintConfluent:
-    producer = None
-    topic = None
-    username = None
-    password = None
-    def __init__(self, servers, topic, username, password):
-        from confluent_kafka import Producer
-
-        #print('PrintKafka('+str(endpoint)+', '+str(topic)+', 
'+str(security_protocol)+', '+str(compression_type)+')')
-        self.servers = servers
-        self.producer = Producer({
-            'bootstrap.servers': servers,
-            'sasl.mechanisms': 'PLAIN',
-            'security.protocol': 'SASL_SSL',
-            'sasl.username': username,
-            'sasl.password': password
-        })
-        self.topic = topic
-        self.username = username
-        self.password = password
-    def __str__(self):
-        return 'PrintConfluent(servers='+self.servers+', topic='+self.topic+', 
username='+self.username+', password='+self.password+')'
-    def print(self, record):
-        print('producing '+str(record))
-        self.producer.produce(topic=self.topic, value=str(record))
-        self.producer.flush()
-
-
-#
-# Handle distributions
-#
-
-class DistConstant:
-    def __init__(self, value):
-        self.value = value
-    def __str__(self):
-        return 'DistConstant(value='+str(self.value)+')'
-    def get_sample(self):
-        return self.value
-
-class DistUniform:
-    def __init__(self, min_value, max_value):
-        self.min_value = min_value
-        self.max_value = max_value
-    def __str__(self):
-        return 'DistUniform(min_value='+str(self.min_value)+', 
max_value='+str(self.max_value)+')'
-    def get_sample(self):
-        return np.random.uniform(self.min_value, self.max_value+1)
-
-class DistExponential:
-    def __init__(self, mean):
-        self.mean = mean
-    def __str__(self):
-        return 'DistExponential(mean='+str(self.mean)+')'
-    def get_sample(self):
-        return np.random.exponential(scale = self.mean)
-
-class DistNormal:
-    def __init__(self, mean, stddev):
-        self.mean = mean
-        self.stddev = stddev
-    def __str__(self):
-        return 'DistNormal(mean='+str(self.mean )+', 
stddev='+str(self.stddev)+')'
-    def get_sample(self):
-        return np.random.normal(self.mean, self.stddev)
-
-def parse_distribution(desc):
-    dist_type = desc['type'].lower()
-    dist_gen = None
-    if dist_type == 'constant':
-        value = desc['value']
-        dist_gen = DistConstant(value)
-    elif dist_type == 'uniform':
-        min_value = desc['min']
-        max_value = desc['max']
-        dist_gen = DistUniform(min_value, max_value)
-    elif dist_type == 'exponential':
-        mean = desc['mean']
-        dist_gen = DistExponential(mean)
-    elif dist_type == 'normal':
-        mean = desc['mean']
-        stddev = desc['stddev']
-        dist_gen = DistNormal(mean, stddev)
-    else:
-        print('Error: Unknown distribution "'+dist_type+'"')
-        exit()
-    return dist_gen
-
-def parse_timestamp_distribution(desc):
-    dist_type = desc['type'].lower()
-    dist_gen = None
-    if dist_type == 'constant':
-        value = dateutil.parser.isoparse(desc['value']).timestamp()
-        dist_gen = DistConstant(value)
-    elif dist_type == 'uniform':
-        min_value = dateutil.parser.isoparse(desc['min']).timestamp()
-        max_value = dateutil.parser.isoparse(desc['max']).timestamp()
-        dist_gen = DistUniform(min_value, max_value)
-    elif dist_type == 'exponential':
-        mean = dateutil.parser.isoparse(desc['mean']).timestamp()
-        dist_gen = DistExponential(mean)
-    elif dist_type == 'normal':
-        mean = desc[dateutil.parser.isoparse(desc['mean']).timestamp()]
-        stddev = desc['stddev']
-        dist_gen = DistNormal(mean, stddev)
-    else:
-        print('Error: Unknown distribution "'+dist_type+'"')
-        exit()
-    return dist_gen
-
-
-#
-# Set up the dimensions for the emitters (see below)
-# There is one element class for each dimension type. This code creates a list 
of
-# elements and then runs through the list to create a single record.
-# Notice that the get_json_field_string() method produces the JSON dimension
-# field object based on the dimension configuration.
-# The get_stochastic_value() method is like a private method used to get a 
random
-# idividual value.
-#
-
-class ElementNow: # The __time dimension
-    def __init__(self, global_clock):
-        self.global_clock = global_clock
-    def __str__(self):
-        return 'ElementNow()'
-    def get_json_field_string(self):
-        now = self.global_clock.now().isoformat()[:-3]
-        return '"__time":"'+now+'"'
-
-class ElementCounter: # The __time dimension
-    def __init__(self, desc):
-        self.name = desc['name']
-        if 'percent_nulls' in desc.keys():
-            self.percent_nulls = desc['percent_nulls'] / 100.0
-        else:
-            self.percent_nulls = 0.0
-        if 'percent_missing' in desc.keys():
-            self.percent_missing = desc['percent_missing'] / 100.0
-        else:
-            self.percent_missing = 0.0
-        if 'start' in desc.keys():
-            self.start = desc['start']
-        else:
-            self.start = 0
-        if 'increment' in desc.keys():
-            self.increment = desc['increment']
-        else:
-            self.increment = 1
-        self.value = self.start
-    def __str__(self):
-        s = 'ElementCounter(name='+self.name
-        if self.start != 0:
-            s += ', '+str(self.start)
-        if self.increment != 1:
-            s += ', '+str(self.increment)
-        s += ')'
-        return s
-
-    def get_stochastic_value(self):
-        v = self.value
-        self.value += self.increment
-        return v
-
-    def get_json_field_string(self):
-        if random.random() < self.percent_nulls:
-            s = '"'+self.name+'": null'
-        else:
-            s = '"'+self.name+'":"'+str(self.get_stochastic_value())+'"'
-        return s
-
-    def is_missing(self):
-        return random.random() < self.percent_missing
-
-
-class ElementEnum: # enumeration dimensions
-    def __init__(self, desc):
-        self.name = desc['name']
-        if 'percent_nulls' in desc.keys():
-            self.percent_nulls = desc['percent_nulls'] / 100.0
-        else:
-            self.percent_nulls = 0.0
-        if 'percent_missing' in desc.keys():
-            self.percent_missing = desc['percent_missing'] / 100.0
-        else:
-            self.percent_missing = 0.0
-        self.cardinality = desc['values']
-        if 'cardinality_distribution' not in desc.keys():
-            print('Element '+self.name+' specifies a cardinality without a 
cardinality distribution')
-            exit()
-        self.cardinality_distribution = 
parse_distribution(desc['cardinality_distribution'])
-
-    def __str__(self):
-        return 'ElementEnum(name='+self.name+', 
cardinality='+str(self.cardinality)+', 
cardinality_distribution='+str(self.cardinality_distribution)+')'
-
-    def get_stochastic_value(self):
-        index = int(self.cardinality_distribution.get_sample())
-        if index < 0:
-            index = 0
-        if index >= len(self.cardinality):
-            index = len(self.cardinality)-1
-        return self.cardinality[index]
-
-    def get_json_field_string(self):
-        if random.random() < self.percent_nulls:
-            s = '"'+self.name+'": null'
-        else:
-            s = '"'+self.name+'":"'+str(self.get_stochastic_value())+'"'
-        return s
-
-    def is_missing(self):
-        return random.random() < self.percent_missing
-
-class ElementVariable: # Variable dimensions
-    def __init__(self, desc):
-        self.name = desc['name']
-        self.variable_name = desc['variable']
-
-    def __str__(self):
-        return 'ElementVariable(name='+self.name+', 
value='+self.variable_name+')'
-
-    def get_json_field_string(self, variables): # NOTE: because of timing, 
this method has a different signature than the other elements
-        value = variables[self.variable_name]
-        return '"'+self.name+'":"'+str(value)+'"'
-
-# TODO: Refactor ElementBase and subclasses, and those element classes that 
don't inherit from ElementBase
-class ElementBase: # Base class for the remainder of the dimensions
-    def __init__(self, desc):
-        self.name = desc['name']
-        if 'percent_nulls' in desc.keys():
-            self.percent_nulls = desc['percent_nulls'] / 100.0
-        else:
-            self.percent_nulls = 0.0
-        if 'percent_missing' in desc.keys():
-            self.percent_missing = desc['percent_missing'] / 100.0
-        else:
-            self.percent_missing = 0.0
-
-        self.cardinality_setting = desc['cardinality']
-        self.cardinality_distribution = None
-
-        if self.cardinality_setting == 0:
-            self.cardinality = None
-
-        else:
-            self.cardinality = []
-            if 'cardinality_distribution' not in desc.keys():
-                print('Element '+self.name+' specifies a cardinality without a 
cardinality distribution')
-                exit()
-            self.cardinality_distribution = 
parse_distribution(desc['cardinality_distribution'])
-            self.init_cardinality()
-
-    def init_cardinality(self):
-        for i in range(self.cardinality_setting):
-            value = None
-            while True:
-                value = self.get_stochastic_value()
-                if value not in self.cardinality:
-                    break
-            self.cardinality.append(value)
-
-
-    def get_stochastic_value(self):
-        pass
-
-    def get_json_field_string(self):
-        if random.random() < self.percent_nulls:
-            s = '"'+self.name+'": null'
-        else:
-            if self.cardinality is None:
-                value = self.get_stochastic_value()
-            else:
-                index = int(self.cardinality_distribution.get_sample())
-                if index < 0:
-                    index = 0
-                if index >= len(self.cardinality):
-                    index = len(self.cardinality)-1
-                value = self.cardinality[index]
-            s = '"'+self.name+'":'+str(value)
-        return s
-
-    def is_missing(self):
-        return random.random() < self.percent_missing
-
-
-class ElementString(ElementBase):
-
-    def __init__(self, desc):
-        self.length_distribution = 
parse_distribution(desc['length_distribution'])
-        if 'chars' in desc:
-            self.chars = desc['chars']
-        else:
-            self.chars = string.printable
-        super().__init__(desc)
-
-    def __str__(self):
-        return 'ElementString(name='+self.name+', 
cardinality='+str(self.cardinality)+', 
cardinality_distribution='+str(self.cardinality_distribution)+', 
chars='+self.chars+')'
-
-    def get_stochastic_value(self):
-        length = int(self.length_distribution.get_sample())
-        return ''.join(random.choices(list(self.chars), k=length))
-
-    def get_json_field_string(self):
-        if random.random() < self.percent_nulls:
-            s = '"'+self.name+'": null'
-        else:
-            if self.cardinality is None:
-                value = self.get_stochastic_value()
-            else:
-                index = int(self.cardinality_distribution.get_sample())
-                if index < 0:
-                    index = 0
-                if index >= len(self.cardinality):
-                    index = len(self.cardinality)-1
-                value = self.cardinality[index]
-            s = '"'+self.name+'":"'+str(value)+'"'
-        return s
-
-class ElementInt(ElementBase):
-    def __init__(self, desc):
-        self.value_distribution = parse_distribution(desc['distribution'])
-        super().__init__(desc)
-
-    def __str__(self):
-        return 'ElementInt(name='+self.name+', 
value_distribution='+str(self.value_distribution)+', 
cardinality='+str(self.cardinality)+', 
cardinality_distribution='+str(self.cardinality_distribution)+')'
-
-    def get_stochastic_value(self):
-        return int(self.value_distribution.get_sample())
-
-class ElementFloat(ElementBase):
-    def __init__(self, desc):
-        self.value_distribution = parse_distribution(desc['distribution'])
-        if 'precision' in desc:
-            self.precision = desc['precision']
-        else:
-            self.precision = None
-        super().__init__(desc)
-
-    def __str__(self):
-        return 'ElementFloat(name='+self.name+', 
value_distribution='+str(self.value_distribution)+', 
cardinality='+str(self.cardinality)+', 
cardinality_distribution='+str(self.cardinality_distribution)+')'
-
-    def get_stochastic_value(self):
-        return float(self.value_distribution.get_sample())
-
-    def get_json_field_string(self):
-        if random.random() < self.percent_nulls:
-            s = '"'+self.name+'": null'
-        else:
-            if self.cardinality is None:
-                value = self.get_stochastic_value()
-            else:
-                index = int(self.cardinality_distribution.get_sample())
-                if index < 0:
-                    index = 0
-                if index >= len(self.cardinality):
-                    index = len(self.cardinality)-1
-                value = self.cardinality[index]
-            if self.precision is None:
-                s = '"'+self.name+'":'+str(value)
-            else:
-                format = '%.'+str(self.precision)+'f'
-                s = '"'+self.name+'":'+str(format%value)
-        return s
-
-class ElementTimestamp(ElementBase):
-    def __init__(self, desc):
-        super().__init__(desc)
-
-    def __str__(self):
-        return 'ElementTimestamp(name='+self.name+', 
value_distribution='+str(self.value_distribution)+', 
cardinality='+str(self.cardinality)+', 
cardinality_distribution='+str(self.cardinality_distribution)+')'
-
-    def get_stochastic_value(self):
-        return 
datetime.fromtimestamp(self.value_distribution.get_sample()).isoformat()[:-3]
-
-    def get_json_field_string(self):
-        if random.random() < self.percent_nulls:
-            s = '"'+self.name+'": null'
-        else:
-            if self.cardinality is None:
-                value = self.get_stochastic_value()
-            else:
-                index = int(self.cardinality_distribution.get_sample())
-                if index < 0:
-                    index = 0
-                if index >= len(self.cardinality):
-                    index = len(self.cardinality)-1
-                value = self.cardinality[index]
-            s = '"'+self.name+'":"'+str(value)+'"'
-        return s
-
-    def is_missing(self):
-        return random.random() < self.percent_missing
-
-class ElementIPAddress(ElementBase):
-    def __init__(self, desc):
-        self.value_distribution = parse_distribution(desc['distribution'])
-        super().__init__(desc)
-
-    def __str__(self):
-        return 'ElementIPAddress(name='+self.name+', 
value_distribution='+str(self.value_distribution)+', 
cardinality='+str(self.cardinality)+', 
cardinality_distribution='+str(self.cardinality_distribution)+')'
-
-    def get_stochastic_value(self):
-        value = int(self.value_distribution.get_sample())
-        return str((value & 0xFF000000) >> 24)+'.'+str((value & 0x00FF0000) >> 
16)+'.'+str((value & 0x0000FF00) >> 8)+'.'+str(value & 0x000000FF)
-
-    def get_json_field_string(self):
-        if random.random() < self.percent_nulls:
-            s = '"'+self.name+'": null'
-        else:
-            if self.cardinality is None:
-                value = self.get_stochastic_value()
-            else:
-                index = int(self.cardinality_distribution.get_sample())
-                if index < 0:
-                    index = 0
-                if index >= len(self.cardinality):
-                    index = len(self.cardinality)-1
-                value = self.cardinality[index]
-            s = '"'+self.name+'":"'+str(value)+'"'
-        return s
-
-class ElementObject():
-    def __init__(self, desc):
-        self.name = desc['name']
-        self.dimensions = get_variables(desc['dimensions'])
-        if 'percent_nulls' in desc.keys():
-            self.percent_nulls = desc['percent_nulls'] / 100.0
-        else:
-            self.percent_nulls = 0.0
-        if 'percent_missing' in desc.keys():
-            self.percent_missing = desc['percent_missing'] / 100.0
-        else:
-            self.percent_missing = 0.0
-        cardinality = desc['cardinality']
-        if cardinality == 0:
-            self.cardinality = None
-            self.cardinality_distribution = None
-        else:
-            self.cardinality = []
-            if 'cardinality_distribution' not in desc.keys():
-                print('Element '+self.name+' specifies a cardinality without a 
cardinality distribution')
-                exit()
-            self.cardinality_distribution = 
parse_distribution(desc['cardinality_distribution'])
-            for i in range(cardinality):
-                value = None
-                while True:
-                    value = self.get_instance()
-                    if value not in self.cardinality:
-                        break
-                self.cardinality.append(value)
-
-    def __str__(self):
-        s = 'ElementObject(name='+self.name+', dimensions=['
-        for e in self.dimensions:
-            s += ',' + str(e)
-        s += '])'
-        return s
-
-    def get_instance(self):
-        s = '"'+self.name+'": {'
-        for e in self.dimensions:
-            s += e.get_json_field_string() + ','
-        s = s[:-1] +  '}'
-        return s
-
-
-    def get_json_field_string(self):
-        if random.random() < self.percent_nulls:
-            s = '"'+self.name+'": null'
-        else:
-            if self.cardinality is None:
-                s = self.get_instance()
-            else:
-                index = int(self.cardinality_distribution.get_sample())
-                if index < 0:
-                    index = 0
-                if index >= len(self.cardinality):
-                    index = len(self.cardinality)-1
-                s = self.cardinality[index]
-        return s
-
-    def is_missing(self):
-        return random.random() < self.percent_missing
-
-class ElementList():
-    def __init__(self, desc):
-        self.name = desc['name']
-        self.elements = get_variables(desc['elements'])
-        self.length_distribution = 
parse_distribution(desc['length_distribution'])
-        self.selection_distribution = 
parse_distribution(desc['selection_distribution'])
-        if 'percent_nulls' in desc.keys():
-            self.percent_nulls = desc['percent_nulls'] / 100.0
-        else:
-            self.percent_nulls = 0.0
-        if 'percent_missing' in desc.keys():
-            self.percent_missing = desc['percent_missing'] / 100.0
-        else:
-            self.percent_missing = 0.0
-        cardinality = desc['cardinality']
-        if cardinality == 0:
-            self.cardinality = None
-            self.cardinality_distribution = None
-        else:
-            self.cardinality = []
-            if 'cardinality_distribution' not in desc.keys():
-                print('Element '+self.name+' specifies a cardinality without a 
cardinality distribution')
-                exit()
-            self.cardinality_distribution = 
parse_distribution(desc['cardinality_distribution'])
-            for i in range(cardinality):
-                value = None
-                while True:
-                    value = self.get_instance()
-                    if value not in self.cardinality:
-                        break
-                self.cardinality.append(value)
-
-    def __str__(self):
-        s = 'ElementObject(name='+self.name
-        s += ', length_distribution='+str(self.length_distribution)
-        s += ', selection_distribution='+str(self.selection_distribution)
-        s += ', elements=['
-        for e in self.elements:
-            s += ',' + str(e)
-        s += '])'
-        return s
-
-    def get_instance(self):
-        s = '"'+self.name+'": ['
-        length = int(self.length_distribution.get_sample())
-        for i in range(length):
-            index = int(self.selection_distribution.get_sample())
-            if index < 0:
-                index = 0
-            if index >= length:
-                index = length-1
-            s += re.sub('^.*?:', '', 
self.elements[index].get_json_field_string(), count=1) + ','
-        s = s[:-1] +  ']'
-        return s
-
-
-    def get_json_field_string(self):
-        if random.random() < self.percent_nulls:
-            s = '"'+self.name+'": null'
-        else:
-            if self.cardinality is None:
-                s = self.get_instance()
-            else:
-                index = int(self.cardinality_distribution.get_sample())
-                if index < 0:
-                    index = 0
-                if index >= len(self.cardinality):
-                    index = len(self.cardinality)-1
-                s = self.cardinality[index]
-        return s
-
-    def is_missing(self):
-        return random.random() < self.percent_missing
-
-
-def parse_element(desc):
-    if desc['type'].lower() == 'counter':
-        el = ElementCounter(desc)
-    elif desc['type'].lower() == 'enum':
-        el = ElementEnum(desc)
-    elif desc['type'].lower() == 'string':
-        el = ElementString(desc)
-    elif desc['type'].lower() == 'int':
-        el = ElementInt(desc)
-    elif desc['type'].lower() == 'float':
-        el = ElementFloat(desc)
-    elif desc['type'].lower() == 'timestamp':
-        el = ElementTimestamp(desc)
-    elif desc['type'].lower() == 'ipaddress':
-        el = ElementIPAddress(desc)
-    elif desc['type'].lower() == 'variable':
-        el = ElementVariable(desc)
-    elif desc['type'].lower() == 'object':
-        el = ElementObject(desc)
-    elif desc['type'].lower() == 'list':
-        el = ElementList(desc)
-    else:
-        print('Error: Unknown dimension type "'+desc['type']+'"')
-        exit()
-    return el
-
-
-def get_variables(desc):
-    elements = []
-    for element in desc:
-        el = parse_element(element)
-        elements.append(el)
-    return elements
-
-def get_dimensions(desc, global_clock):
-    elements = get_variables(desc)
-    elements.insert(0, ElementNow(global_clock))
-    return elements
-
-
-#
-# Set up the state machine
-#
-
-class Transition:
-    def __init__(self, next_state, probability):
-        self.next_state = next_state
-        self.probability = probability
-
-    def __str__(self):
-        return 'Transition(next_state='+str(self.next_state)+', 
probability='+str(self.probability)+')'
-
-def parse_transitions(desc):
-    transitions = []
-    for trans in desc:
-        next_state = trans['next']
-        probability = float(trans['probability'])
-        transitions.append(Transition(next_state, probability))
-    return transitions
-
-class State:
-    def __init__(self, name, dimensions, delay, transitions, variables):
-        self.name = name
-        self.dimensions = dimensions
-        self.delay = delay
-        self.transistion_states = [t.next_state for t in transitions]
-        self.transistion_probabilities = [t.probability for t in transitions]
-        self.variables = variables
-
-    def __str__(self):
-        return 'State(name='+self.name+', dimensions='+str([str(d) for d in 
self.dimensions])+', delay='+str(self.delay)+', 
transistion_states='+str(self.transistion_states)+', 
transistion_probabilities='+str(self.transistion_probabilities)+'variables='+str([str(v)
 for v in self.variables])+')'
-
-    def get_next_state_name(self):
-        return random.choices(self.transistion_states, 
weights=self.transistion_probabilities, k=1)[0]
-
-class SimEnd:
-    lock = threading.Lock()
-    thread_end_event = threading.Event()
-    def __init__(self, total_recs, runtime, global_clock):
-        self.total_recs = total_recs
-        self.record_count = 0
-        self.global_clock = global_clock
-        if runtime is None:
-            self.t = None
-        else:
-            if runtime[-1].lower() == 's':
-                self.t = int(runtime[:-1])
-            elif runtime[-1].lower() == 'm':
-                self.t = int(runtime[:-1]) * 60
-            elif runtime[-1].lower() == 'h':
-                self.t = int(runtime[:-1]) * 60 * 60
-            else:
-                print('Error: Unknown runtime value"'+runtime+'"')
-                exit()
-
-    def inc_rec_count(self):
-        self.lock.acquire()
-        self.record_count += 1
-        self.lock.release()
-        if (self.total_recs is not None) and (self.record_count >= 
self.total_recs):
-            self.thread_end_event.set()
-
-    def is_done(self):
-        return ((self.total_recs is not None) and (self.record_count >= 
self.total_recs)) or ((self.t is not None) and self.thread_end_event.is_set())
-
-    def wait_for_end(self):
-        if self.t is not None:
-            self.global_clock.activate_thread()
-            self.global_clock.sleep(self.t)
-            self.thread_end_event.set()
-            self.global_clock.deactivate_thread()
-        elif self.total_recs is not None:
-            self.thread_end_event.wait()
-            self.global_clock.release_all()
-        else:
-            while True:
-                time.sleep(60)
-
-
-#
-# Run the driver
-#
-
-def create_record(dimensions, variables):
-    json_string = '{'
-    for element in dimensions:
-        if isinstance(element, ElementVariable):
-            json_string += element.get_json_field_string(variables) + ','
-        else:
-            if isinstance(element, ElementNow) or not element.is_missing():
-                json_string += element.get_json_field_string() + ','
-    json_string = json_string[:-1] + '}'
-    return json_string
-
-def set_variable_values(variables, dimensions):
-    for d in dimensions:
-        variables[d.name] = d.get_stochastic_value()
-
-def worker_thread(target_printer, states, initial_state, sim_end, 
global_clock):
-    # Process the state machine using worker threads
-    #print('Thread '+threading.current_thread().name+' starting...')
-    global_clock.activate_thread()
-    current_state = initial_state
-    variables = {}
-    while True:
-        set_variable_values(variables, current_state.variables)
-        record = create_record(current_state.dimensions, variables)
-        target_printer.print(record)
-        sim_end.inc_rec_count()
-        if sim_end.is_done():
-            break
-        delta = float(current_state.delay.get_sample())
-        global_clock.sleep(delta)
-        if sim_end.is_done():
-            break
-        next_state_name = current_state.get_next_state_name()
-        if next_state_name.lower() == 'stop':
-            break
-        current_state = states[next_state_name]
-
-    #print('Thread '+threading.current_thread().name+' done!')
-    global_clock.end_thread()
-
-def spawning_thread(target_printer, rate_delay, states, initial_state, 
sim_end, global_clock):
-    #print('Thread '+threading.current_thread().name+' starting...')
-    global_clock.activate_thread()
-    # Spawn the workers in a separate thread so we can stop the whole thing in 
the middle of spawning if necessary
-    count = 0
-    while not sim_end.is_done():
-        thread_name = 'W'+str(count)
-
-        count += 1
-        t = threading.Thread(target=worker_thread, args=(target_printer, 
states, initial_state, sim_end, global_clock, ), name=thread_name, daemon=True)
-        t.start()
-        global_clock.sleep(float(rate_delay.get_sample()))
-    global_clock.end_thread()
-    #print('Thread '+threading.current_thread().name+' done!')
-
-
-def simulate(config_file_name, runtime, total_recs, time_type, start_time):
-
-    if config_file_name:
-        with open(config_file_name, 'r') as f:
-            config = json.load(f)
-    else:
-        config = json.load(sys.stdin)
-
-    #
-    # Set up the gloabl clock
-    #
-
-    global_clock = Clock(time_type, start_time)
-    sim_end = SimEnd(total_recs, runtime, global_clock)
-
-
-    #
-    # Set up the output target
-    #
-
-    target = config['target']
-
-    if target['type'].lower() == 'stdout':
-        target_printer = PrintStdout()
-    elif target['type'].lower() == 'file':
-        path = target['path']
-        if path is None:
-            print('Error: File target requires a path item')
-            exit()
-        target_printer = PrintFile(path)
-    elif target['type'].lower() == 'kafka':
-        if 'endpoint' in target.keys():
-            endpoint = target['endpoint']
-        else:
-            print('Error: Kafka target requires an endpoint item')
-            exit()
-        if 'topic' in target.keys():
-            topic = target['topic']
-        else:
-            print('Error: Kafka target requires a topic item')
-            exit()
-        if 'security_protocol' in target.keys():
-            security_protocol = target['security_protocol']
-        else:
-            security_protocol = 'PLAINTEXT'
-        if 'compression_type' in target.keys():
-            compression_type = target['compression_type']
-        else:
-            compression_type = None
-        target_printer = PrintKafka(endpoint, topic, security_protocol, 
compression_type)
-    elif target['type'].lower() == 'confluent':
-        if 'servers' in target.keys():
-            servers = target['servers']
-        else:
-            print('Error: Conlfuent target requires a servers item')
-            exit()
-        if 'topic' in target.keys():
-            topic = target['topic']
-        else:
-            print('Error: Confluent target requires a topic item')
-            exit()
-        if 'username' in target.keys():
-            username = target['username']
-        else:
-            print('Error: Confluent target requires a username')
-            exit()
-        if 'password' in target.keys():
-            password = target['password']
-        else:
-            print('Error: Confluent target requires a password')
-            exit()
-        target_printer = PrintConfluent(servers, topic, username, password)
-    else:
-        print('Error: Unknown target type "'+target['type']+'"')
-        exit()
-
-    #sys.stderr.write('target='+str(target_printer)+'\n')
-
-
-    #
-    # Set up the interarrival rate
-    #
-
-    rate = config['interarrival']
-    rate_delay = parse_distribution(rate)
-
-    #sys.stderr.write('rate_delay='+str(rate_delay)+'\n')
-
-
-    #
-    # Set up emitters list
-    #
-
-    emitters = {}
-    for emitter in config['emitters']:
-        name = emitter['name']
-        dimensions = get_dimensions(emitter['dimensions'], global_clock)
-        emitters[name] = dimensions
-
-    #sys.stderr.write('emitters='+str(['(name='+str(key)+', 
dimensions='+str([str(e) for e in emitters[key]])+')' for key in 
emitters])+'\n')
-
-
-    #
-    # Set up the state machine
-    #
-
-    state_desc = config['states']
-    initial_state = None
-    states = {}
-    for state in state_desc:
-        name = state['name']
-        emitter_name = state['emitter']
-        if 'variables' not in state.keys():
-            variables = []
-        else:
-            variables = get_variables(state['variables'])
-        dimensions = emitters[emitter_name]
-        delay = parse_distribution(state['delay'])
-        transitions = parse_transitions(state['transitions'])
-        this_state = State(name, dimensions, delay, transitions, variables)
-        states[name] = this_state
-        if initial_state is None:
-            initial_state = this_state
-
-    #sys.stderr.write('states='+str(['('+str(key)+':'+str(states[key])+')' for 
key in states])+'\n')
-
-    #
-    # Finally, start the simulation
-    #
-
-    thrd = threading.Thread(target=spawning_thread, args=(target_printer, 
rate_delay, states, initial_state, sim_end, global_clock, ), name='Spawning', 
daemon=True)
-    thrd.start()
-    sim_end.wait_for_end()
-
-def main():
-
-    #
-    # Parse the command line
-    #
-
-    parser = argparse.ArgumentParser(description='Generates JSON records as a 
workload for Apache Druid.')
-    #parser.add_argument('config_file', metavar='<config file name>', 
help='the workload config file name')
-    parser.add_argument('-f', dest='config_file', nargs='?', help='the 
workload config file name')
-    parser.add_argument('-t', dest='time', nargs='?', help='the script runtime 
(may not be used with -n)')
-    parser.add_argument('-n', dest='n_recs', nargs='?', help='the number of 
records to generate (may not be used with -t)')
-    parser.add_argument('-s', dest='time_type', nargs='?', const='SIM', 
default='REAL', help='simulate time (default is real, not simulated)')
-
-    args = parser.parse_args()
-
-    config_file_name = args.config_file
-    runtime = args.time
-    total_recs = None
-    if args.n_recs is not None:
-        total_recs = int(args.n_recs)
-    time_type = args.time_type
-    if time_type == 'SIM':
-        start_time = datetime.now()
-    elif time_type == 'REAL':
-        start_time = datetime.now()
-    else:
-        start_time = dateutil.parser.isoparse(time_type)
-        time_type = 'SIM'
-
-
-    if (runtime is not None) and (total_recs is not None):
-        print("Use either -t or -n, but not both")
-        parser.print_help()
-        exit()
-
-
-    simulate(config_file_name, runtime, total_recs, time_type, start_time)
-
-
-
-if __name__ == "__main__":
-    main()
diff --git 
a/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/kafka_docker_config.json
 
b/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/kafka_docker_config.json
deleted file mode 100644
index 2add8f3fa1..0000000000
--- 
a/examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/kafka_docker_config.json
+++ /dev/null
@@ -1,90 +0,0 @@
-{
-  "target": {
-    "type": "kafka",
-    "endpoint": "kafka:9092",
-    "topic": "social_media"
-  },
-  "emitters": [
-    {
-      "name": "example_record_1",
-      "dimensions": [
-        {
-          "type": "enum",
-          "name": "username",
-          "values": ["willow", "mia", "leon", "milton", "miette", "gus", 
"jojo", "rocket"],
-          "cardinality_distribution": {
-            "type": "uniform",
-            "min": 0,
-            "max": 7
-          }
-        },
-        {
-          "type": "string",
-          "name": "post_title",
-          "length_distribution": {"type": "uniform", "min": 1, "max": 140},
-          "cardinality": 0,
-          "chars": 
"abcdefghijklmnopqrstuvwxyz0123456789_ABCDEFGHIJKLMNOPQRSTUVWXYZ!';:,."
-        },
-        {
-          "type": "int",
-          "name": "views",
-          "distribution": {
-            "type": "exponential",
-            "mean": 10000
-          },
-          "cardinality": 0
-        },
-        {
-          "type": "int",
-          "name": "upvotes",
-          "distribution": {
-            "type": "normal",
-            "mean": 70,
-            "stddev": 20
-          },
-          "cardinality": 0
-        },
-        {
-          "type": "int",
-          "name": "comments",
-          "distribution": {
-            "type": "normal",
-            "mean": 10,
-            "stddev": 5
-          },
-          "cardinality": 0
-        },
-        {
-          "type": "enum",
-          "name": "edited",
-          "values": ["True","False"],
-          "cardinality_distribution": {
-            "type": "uniform",
-            "min": 0,
-            "max": 1
-          }
-        }
-      ]
-    }
-  ],
-  "interarrival": {
-    "type": "constant",
-    "value": 1
-  },
-  "states": [
-    {
-      "name": "state_1",
-      "emitter": "example_record_1",
-      "delay": {
-        "type": "constant",
-        "value": 1
-      },
-      "transitions": [
-        {
-          "next": "state_1",
-          "probability": 1.0
-        }
-      ]
-    }
-  ]
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to