sergioferragut commented on code in PR #14742:
URL: https://github.com/apache/druid/pull/14742#discussion_r1289235347
##########
examples/quickstart/jupyter-notebooks/notebooks/01-introduction/02-datagen-intro.ipynb:
##########
@@ -0,0 +1,618 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "id": "9e07b3f5-d919-4179-91a1-0f6b66c42757",
+ "metadata": {},
+ "source": [
+ "# Data Generator Server\n",
+ "The default Docker Compose deployment includes a data generation service
created from the published Docker image at `imply/datagen:latest`. \n",
+ "This image is built by the project
https://github.com/implydata/druid-datagenerator. \n",
+ "\n",
+ "This notebook shows you how to use of the data generation service
included in the Docker Compose deployment. It explains how to use pre-defined
data generator configurations as well as how to build a custom data generator.
You will also learn how to create sample data files for batch ingestion and how
to generate live streaming data for streaming ingestion.\n",
+ "\n",
+ "## Table of contents\n",
+ "\n",
+ "* [Initialization](#Initialization)\n",
+ "* [List Available Configurations](#List-available-configurations)\n",
+ "* [Generate a data file for backfilling
history](#Generate-a-data-file-for-backfilling-history)\n",
+ "* [Batch Ingestion of Generated
Files](#Batch-Ingestion-of-Generated-Files)\n",
+ "* [Generate custom data](#Generate-custom-data)\n",
+ "* [Stream generated data](#Stream-generated-data)\n",
+ "* [Ingest data from a stream](#Ingest-data-from-a-stream)\n",
+ "* [Cleanup](#Cleanup)\n",
+ "\n",
+ "\n",
+ "## Initialization\n",
+ "\n",
+ "To interact with the data generation service, use the REST client
provided in the [`druidapi` Python
package](https://druid.apache.org/docs/latest/tutorials/tutorial-jupyter-index.html#python-api-for-druid)."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "f84766c7-c6a5-4496-91a3-abdb8ddd2375",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import druidapi\n",
+ "import os\n",
+ "\n",
+ "# Datagen client \n",
+ "datagen = druidapi.rest.DruidRestClient(\"http://datagen:9999\")\n",
+ "\n",
+ "if (os.environ['DRUID_HOST'] == None):\n",
+ " druid_host=f\"http://router:8888\"\n",
+ "else:\n",
+ " druid_host=f\"http://{os.environ['DRUID_HOST']}:8888\"\n",
+ "\n",
+ "# Druid client\n",
+ "druid = druidapi.jupyter_client(druid_host)\n",
+ "\n",
+ "\n",
+ "\n",
+ "# these imports and constants are used by multiple cells\n",
+ "from datetime import datetime, timedelta\n",
+ "import json\n",
+ "\n",
+ "headers = {\n",
+ " 'Content-Type': 'application/json'\n",
+ "}\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "c54af617-0998-4010-90c3-9b5a38a09a5f",
+ "metadata": {},
+ "source": [
+ "### List available configurations\n",
+ "Use `/list` API endpoint to get the data generator's available
configuration values with pre-defined data generator schemas."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "1ba6a80a-c49b-4abf-943b-9dad82f2ae13",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "display(datagen.get(f\"/list\", require_ok=False).json())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "ae88a3b7-60da-405d-bcf4-fb4affcfe973",
+ "metadata": {},
+ "source": [
+ "### Generate a data file for backfilling history\n",
+ "When generating a file for backfill purposes, you can select the start
time and the duration of the simulation.\n",
+ "This example shows how to configure the data generator request:\n",
+ "* `name`: an arbitrary name you assign to the job. Refer to the job name
to get the job status or to stop the job.\n",
+ "* `target.type`: \"file\" to generate a data file\n",
+ "* `target.path`: identifies the name of the file to generate, it will
ignore any path specified.\n",
+ "* `time_type`,`time`: The data generator simulates the time range you
specify with a start timestamp in the `\"time_type\"` property and a duration
in the `\"time\"` property with `h` suffix for hours, `m` for minutes or `s`
for seconds.\n",
+ "- `\"concurrency\"` indicates the maximum number of entities used
concurrently to generate events. Each entity is a separate state machine that
simulates things like user sessions, IoT devices, or other concurrent sources
of event data. "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "811ff58f-75af-4092-a08d-5e07a51592ff",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Configure the start time to one hour prior to the current time. \n",
+ "startDateTime = (datetime.now() - timedelta(hours =
1)).strftime('%Y-%m-%dT%H:%M:%S.001')\n",
+ "print(f\"Starting to generate history at {startDateTime}.\")\n",
+ "\n",
+ "# give the datagen job a name for use in subsequent API calls\n",
+ "job_name=\"gen_clickstream1\"\n",
+ "\n",
+ "\n",
+ "# Generate a data file on the datagen server\n",
+ "datagen_request = {\n",
+ " \"name\": job_name,\n",
+ " \"target\": { \"type\": \"file\", \"path\":\"clicks.json\"},\n",
+ " \"config_file\": \"clickstream/clickstream.json\", \n",
+ " \"time_type\": startDateTime,\n",
+ " \"time\": \"1h\",\n",
+ " \"concurrency\":100\n",
+ "}\n",
+ "response = datagen.post(\"/start\", json.dumps(datagen_request),
headers=headers, require_ok=False)\n",
+ "response.json()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "d407d1d9-3f01-4128-a014-6a5f371c25a5",
+ "metadata": {},
+ "source": [
+ "#### Display jobs\n",
+ "Use the `/jobs` API endpoint to get the current jobs and job statuses."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "3de698c5-bcf4-40c7-b295-728fb54d1f0a",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "display(datagen.get(f\"/jobs\").json())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "972ebed0-34a1-4ad2-909d-69b8b27c3046",
+ "metadata": {},
+ "source": [
+ "#### Get status of a job\n",
+ "Use the `/status/JOB_NAME` API endpoint to get the current jobs and their
status."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "debce4f8-9c16-476c-9593-21ec984985d2",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "display(datagen.get(f\"/status/{job_name}\", require_ok=False).json())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "ef818d78-6aa6-4d38-8a43-83416aede96f",
+ "metadata": {},
+ "source": [
+ "#### Stop a job\n",
+ "Use the `/stop/JOB_NAME` API endpoint to stop a job."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "7631b8b8-d3d6-4803-9162-587f440d2ef2",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "display(datagen.post(f\"/stop/{job_name}\", '').json())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0a8dc7d3-64e5-41e3-8c28-c5f19c0536f5",
+ "metadata": {},
+ "source": [
+ "#### List files created on datagen server\n",
+ "Use the `/files` API endpoint to list files available on the server."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "06ee36bd-2d2b-4904-9987-10636cf52aac",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "display(datagen.get(f\"/files\", '').json())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "83ef9edb-98e2-45b4-88e8-578703faedc1",
+ "metadata": {},
+ "source": [
+ "### Batch Ingestion of Generated Files\n",
+ "Use a [Druid HTTP input
source](https://druid.apache.org/docs/latest/ingestion/native-batch-input-sources.html#http-input-source)
in the [EXTERN
function](https://druid.apache.org/docs/latest/multi-stage-query/reference.html#extern-function)
of a [SQL-based
ingestion](https://druid.apache.org/docs/latest/multi-stage-query/index.html)
to load generated files.\n",
+ "You can access files by name using the
`http://datagen:9999/file/FILE_NAME`. Alternatively, if you are running Druid
outside of Docker, but on the same machine, use
`http://localhost:9999/file/FILE_NAME`.\n",
+ "The following example assumes that both Druid and the data generator
server are running in Docker Compose."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "0d72b015-f8ec-4713-b6f2-fe7a15afff59",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "sql = '''\n",
+ "REPLACE INTO \"clicks\" OVERWRITE ALL\n",
+ "WITH \"ext\" AS (SELECT *\n",
+ "FROM TABLE(\n",
+ " EXTERN(\n",
+ "
'{\"type\":\"http\",\"uris\":[\"http://datagen:9999/file/clicks.json\"]}',\n",
+ " '{\"type\":\"json\"}'\n",
+ " )\n",
+ ") EXTEND (\"time\" VARCHAR, \"user_id\" VARCHAR, \"event_type\" VARCHAR,
\"client_ip\" VARCHAR, \"client_device\" VARCHAR, \"client_lang\" VARCHAR,
\"client_country\" VARCHAR, \"referrer\" VARCHAR, \"keyword\" VARCHAR,
\"product\" VARCHAR))\n",
+ "SELECT\n",
+ " TIME_PARSE(\"time\") AS \"__time\",\n",
+ " \"user_id\",\n",
+ " \"event_type\",\n",
+ " \"client_ip\",\n",
+ " \"client_device\",\n",
+ " \"client_lang\",\n",
+ " \"client_country\",\n",
+ " \"referrer\",\n",
+ " \"keyword\",\n",
+ " \"product\"\n",
+ "FROM \"ext\"\n",
+ "PARTITIONED BY DAY\n",
+ "''' \n",
+ "\n",
+ "druid.display.run_task(sql)\n",
+ "print(\"Waiting for segment avaialbility ...\")\n",
+ "druid.sql.wait_until_ready('clicks')\n",
+ "print(\"Data is available for query.\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "b0997b38-02c2-483e-bd15-439c4bf0097a",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "druid.display.sql('''\n",
+ "SELECT \"event_type\", \"user_id\", count( DISTINCT \"client_ip\")
ip_count\n",
+ "FROM \"clicks\"\n",
+ "GROUP BY 1,2\n",
+ "ORDER BY 3 DESC\n",
+ "LIMIT 10\n",
+ "''')\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "66ec013f-28e4-4d5a-94a6-06e0ed537b4e",
+ "metadata": {},
+ "source": [
+ "## Generate custom data\n",
+ "\n",
+ "You can find the full set of configuration options for the Data Generator
in the
[README](https://github.com/implydata/druid-datagenerator#data-generator-configuration).\n",
+ "\n",
+ "This section demonstrates a simple custom configuration as an example.
Notice that the emitter defined the schema as a list of dimensions, each
dimension specifies how its values are generated: "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "d6451310-b7dd-4b39-a23b-7b735b152d6c",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "gen_config = {\n",
+ " \"emitters\": [\n",
+ " {\n",
+ " \"name\": \"simple_record\",\n",
+ " \"dimensions\": [\n",
+ " {\n",
+ " \"type\": \"string\",\n",
+ " \"name\": \"random_string_column\",\n",
+ " \"length_distribution\": {\n",
+ " \"type\": \"constant\",\n",
+ " \"value\": 13\n",
+ " },\n",
+ " \"cardinality\": 0,\n",
+ " \"chars\": \"#.abcdefghijklmnopqrstuvwxyz\"\n",
+ " },\n",
+ " {\n",
+ " \"type\": \"int\",\n",
+ " \"name\": \"distributed_number\",\n",
+ " \"distribution\": {\n",
+ " \"type\": \"uniform\",\n",
+ " \"min\": 0,\n",
+ " \"max\": 1000\n",
+ " },\n",
+ " \"cardinality\": 10,\n",
+ " \"cardinality_distribution\": {\n",
+ " \"type\": \"exponential\",\n",
+ " \"mean\": 5\n",
+ " }\n",
+ " }\n",
+ " ]\n",
+ " }\n",
+ " ],\n",
+ " \"interarrival\": {\n",
+ " \"type\": \"constant\",\n",
+ " \"value\": 1\n",
+ " },\n",
+ " \"states\": [\n",
+ " {\n",
+ " \"name\": \"state_1\",\n",
+ " \"emitter\": \"simple_record\",\n",
+ " \"delay\": {\n",
+ " \"type\": \"constant\",\n",
+ " \"value\": 1\n",
+ " },\n",
+ " \"transitions\": [\n",
+ " {\n",
+ " \"next\": \"state_1\",\n",
+ " \"probability\": 1.0\n",
+ " }\n",
+ " ]\n",
+ " }\n",
+ " ]\n",
+ "}\n",
+ "\n",
+ "target = { \"type\":\"file\", \"path\":\"sample_data.json\"}"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "89a22645-aea5-4c15-b81a-959b27df731f",
+ "metadata": {},
+ "source": [
+ "This example uses the `config` attribute of the request to configure a
new custom data generator instead of using a predefined `config_file`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "e5e5c535-3474-42b4-9772-14279e712f3d",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# generate 1 hour of simulated time using custom configuration\n",
+ "datagen_request = {\n",
+ " \"name\": \"sample_custom\",\n",
+ " \"target\": target,\n",
+ " \"config\": gen_config, \n",
+ " \"time\": \"1h\",\n",
+ " \"concurrency\":10,\n",
+ " \"time_type\": \"SIM\"\n",
+ "}\n",
+ "response = datagen.post(\"/start\", json.dumps(datagen_request),
headers=headers, require_ok=False)\n",
+ "response.json()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "952386f7-8181-4325-972b-5f30dc12cf21",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "display(datagen.get(f\"/jobs\", require_ok=False).json())"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "470b3a2a-4fd9-45a2-9221-497d906f62a9",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# display the first 1k of generated data file\n",
+ "display( datagen.get(f\"/file/sample_data.json\").content[:1024])"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "350faea6-55b0-4386-830c-5160ae495012",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "datagen.post(f\"/stop/sample_custom\",'')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "77bff054-0f16-4fd5-8ade-2d44b30d0cf2",
+ "metadata": {},
+ "source": [
+ "## Stream generated data\n",
+ "\n",
+ "The data generator works exactly the same whether it is writing data to a
file or publishing messages into a stream. You only need to change the target
configuration.\n",
+ "\n",
+ "To use the Kafka container running on Docker Compose, use the host name
`kafka:9092`. This tutorial uses the KAFKA_HOST environment variable from
Docker Compose to specify the Kafka host. "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "9959b7c3-6223-479d-b0c2-115a1c555090",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "if (os.environ['KAFKA_HOST'] == None):\n",
+ " kafka_host=f\"kafka:9092\"\n",
+ "else:\n",
+ " kafka_host=f\"{os.environ['KAFKA_HOST']}:9092\""
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "497abc18-6538-4536-a17f-fe10c4367611",
+ "metadata": {},
+ "source": [
+ "The simplest `target` object for Kafka and, similarly, Confluent is:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "686a74ab-e2dd-458e-9e93-10291064e9db",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "target = {\n",
+ " \"type\":\"kafka\",\n",
+ " \"endpoint\": kafka_host,\n",
+ " \"topic\": \"custom_data\"\n",
+ "}\n",
+ "\n",
+ "# Generate 1 hour of real time using custom configuration, this means
that this stream will run for an hour if not stopped\n",
+ "datagen_request = {\n",
+ " \"name\": \"sample_custom\",\n",
+ " \"target\": target,\n",
+ " \"config\": gen_config, \n",
+ " \"time\": \"1h\",\n",
+ " \"concurrency\":10,\n",
+ " \"time_type\": \"REAL\"\n",
+ "}\n",
+ "response = datagen.post(\"/start\", json.dumps(datagen_request),
headers=headers, require_ok=False)\n",
+ "response.json()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "ec17d0c7-a3ab-4f37-bbf0-cc02bff44cf1",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "display(datagen.get(f\"/jobs\", require_ok=False).json())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "84d7b706-9040-4a69-a956-1b1bbb037c32",
+ "metadata": {},
+ "source": [
+ "### Ingest data from a stream \n",
+ "This example shows how to start a streaming ingestion supervisor in
Apache Druid to consume your custom data:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "51912409-e4e7-48d1-b3a5-b269622b4e56",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "ingestion_spec ={\n",
+ " \"type\": \"kafka\",\n",
+ " \"spec\": {\n",
+ " \"ioConfig\": {\n",
+ " \"type\": \"kafka\",\n",
+ " \"consumerProperties\": {\n",
+ " \"bootstrap.servers\": \"kafka:9092\"\n",
+ " },\n",
+ " \"topic\": \"custom_data\",\n",
+ " \"inputFormat\": {\n",
+ " \"type\": \"json\"\n",
+ " },\n",
+ " \"useEarliestOffset\": True\n",
+ " },\n",
+ " \"tuningConfig\": {\n",
+ " \"type\": \"kafka\",\n",
+ " \"maxRowsInMemory\": 100000,\n",
+ " \"resetOffsetAutomatically\": False\n",
+ " },\n",
+ " \"dataSchema\": {\n",
+ " \"dataSource\": \"custom_data\",\n",
+ " \"timestampSpec\": {\n",
+ " \"column\": \"time\",\n",
+ " \"format\": \"iso\"\n",
+ " },\n",
+ " \"dimensionsSpec\": {\n",
+ " \"dimensions\": [\n",
+ " \"random_string_column\",\n",
+ " {\n",
+ " \"type\": \"long\",\n",
+ " \"name\": \"distributed_number\"\n",
+ " }\n",
+ " ]\n",
+ " },\n",
+ " \"granularitySpec\": {\n",
+ " \"queryGranularity\": \"none\",\n",
+ " \"rollup\": False,\n",
+ " \"segmentGranularity\": \"hour\"\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ "}\n",
+ "\n",
+ "headers = {\n",
+ " 'Content-Type': 'application/json'\n",
+ "}\n",
+ "\n",
+ "druid.rest.post(\"/druid/indexer/v1/supervisor\",
json.dumps(ingestion_spec), headers=headers)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "dddfb1cc-f863-4bf4-8c5a-b261b0b9c2f0",
+ "metadata": {},
+ "source": [
+ "Query the data on the stream, but first wait for its availability. It
takes a bit of time for the streaming tasks to start, but once they are
consuming you can see data very close to real time: Run the following cell
multiple times to see how the data is changing:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "7e1284ed-5c49-4f37-81f7-c3b720473158",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "druid.sql.wait_until_ready('custom_data', verify_load_status=False)\n",
+ "druid.display.sql('''\n",
+ "SELECT SUM(distributed_number) sum_randoms, count(*) total_count\n",
+ "FROM custom_data\n",
+ "''')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "4486e430-0776-46ad-8a8b-4f0354f17bfb",
+ "metadata": {},
+ "source": [
+ "### Cleanup\n",
+ "\n",
+ "Stop the streaming ingestion and the streaming producer:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "38943a92-dc23-41cf-91a4-1b68d2178033",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "print(f\"Stop streaming generator:
[{datagen.post('/stop/sample_custom','',require_ok=False)}]\")\n",
+ "print(f'Stop streaming ingestion:
[{druid.rest.post(\"/druid/indexer/v1/supervisor/custom_data/terminate\",\"\",
require_ok=False)}]')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0cf53bdc-de7f-425d-84b1-68d0cef420d8",
+ "metadata": {},
+ "source": [
+ "Wait for streaming ingestion to complete and then remove the custom data
table:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "87341e7c-f7ab-488c-9913-091f712534cb",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "print(f\"Drop datasource: [{druid.datasources.drop('custom_data')}]\")"
Review Comment:
yeah, druidapi improvement needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]