paul-rogers commented on code in PR #13787:
URL: https://github.com/apache/druid/pull/13787#discussion_r1117865694
##########
examples/quickstart/jupyter-notebooks/Python_API_Tutorial.ipynb:
##########
@@ -0,0 +1,1262 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "id": "ce2efaaa",
+ "metadata": {},
+ "source": [
+ "# Tutorial: Learn the Druid Python API\n",
+ "\n",
+ "This notebook provides a quick introduction to the Python wrapper around
the [Druid REST API](api-tutorial.ipynb). This notebook assumes you are
familiar with the basics of the REST API, and the [set of operations which
Druid
provides](https://druid.apache.org/docs/latest/operations/api-reference.html).
Here we focus on using Python to access those APIs rather than explaining the
APIs themselves. The APIs themselves are covered in other notebooks that use
the Python API.\n",
+ "\n",
+ "The Druid Python API is primarily intended to help with these notebook
tutorials. It can also be used in a regular Python program, as long as the
IPython dependencies are available.\n",
+ "\n",
+ "The Druid Python API is a work in progress. We add API wrappers as needed
for the notebook tutorials. If you find you need additional wrappers, please
feel free to add them, and post a PR to Apache Druid with your additions.\n",
+ "\n",
+ "The API provides two levels of functions. Most are simple wrappers around
Druid's REST APIs. Others add additional code to make the API easier to use.
The SQL query interface is a prime example: extra code translates a simple SQL
query into Druid's `SQLQuery` object and interprets the results into a form
that can be displayed in a notebook.\n",
+ "\n",
+ "We start by importing the `druidapi` package from the same folder as this
notebook. The `styles()` calls adds some CSS styles needed to display results."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "id": "6d90ca5d",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "<style>\n",
+ " .druid table {\n",
+ " border: 1px solid black;\n",
+ " border-collapse: collapse;\n",
+ " }\n",
+ "\n",
+ " .druid th, .druid td {\n",
+ " padding: 4px 1em ;\n",
+ " text-align: left;\n",
+ " }\n",
+ "\n",
+ " td.druid-right, th.druid-right {\n",
+ " text-align: right;\n",
+ " }\n",
+ "\n",
+ " td.druid-center, th.druid-center {\n",
+ " text-align: center;\n",
+ " }\n",
+ "\n",
+ " .druid .druid-left {\n",
+ " text-align: left;\n",
+ " }\n",
+ "\n",
+ " .druid-alert {\n",
+ " color: red;\n",
+ " }\n",
+ "</style>\n"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "<style>\n",
+ " .druid table {\n",
+ " border: 1px solid black;\n",
+ " border-collapse: collapse;\n",
+ " }\n",
+ "\n",
+ " .druid th, .druid td {\n",
+ " padding: 4px 1em ;\n",
+ " text-align: left;\n",
+ " }\n",
+ "\n",
+ " td.druid-right, th.druid-right {\n",
+ " text-align: right;\n",
+ " }\n",
+ "\n",
+ " td.druid-center, th.druid-center {\n",
+ " text-align: center;\n",
+ " }\n",
+ "\n",
+ " .druid .druid-left {\n",
+ " text-align: left;\n",
+ " }\n",
+ "\n",
+ " .druid-alert {\n",
+ " color: red;\n",
+ " }\n",
+ "</style>\n"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "import druidapi\n",
+ "druidapi.styles()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "fb68a838",
+ "metadata": {},
+ "source": [
+ "Next we connect to our cluster by providing the router endpoint. Here we
assume the cluster is on your local machine, using the default port. Go ahead
and change this if your setup is different.\n",
+ "\n",
+ "The API uses the router to forward messages to each of Druid's services
so that we don't have to keep track of the host and port for each service."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "id": "ae601081",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "druid = druidapi.client('http://localhost:8888')"
Review Comment:
There is another step, a cell or two down, that shows how to wait for the
cluster to become ready. We could create a new method that does both of these
steps.
##########
examples/quickstart/jupyter-notebooks/druidapi/rest.py:
##########
@@ -0,0 +1,180 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import requests
+from .util import dict_get, is_blank
+from urllib.parse import quote
+from .error import ClientError
+
+def check_error(response):
+ """
+ Raises a requests HttpError if the response code is not OK or Accepted.
+
+ If the response included a JSON payload, then the message is extracted
+ from that payload, else the message is from requests. The JSON
+ payload, if any, is returned in the json field of the error.
Review Comment:
I'm trying to explain Druid's wild & crazy forms of error response. I
reworded the explanation in the hopes it is somewhat clearer.
##########
examples/quickstart/jupyter-notebooks/druidapi/tasks.py:
##########
@@ -0,0 +1,178 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .consts import OVERLORD_BASE
+
+# Tasks
+REQ_TASKS = OVERLORD_BASE + '/tasks'
+REQ_POST_TASK = OVERLORD_BASE + '/task'
+REQ_GET_TASK = REQ_POST_TASK + '/{}'
+REQ_TASK_STATUS = REQ_GET_TASK + '/status'
+REQ_TASK_REPORTS = REQ_GET_TASK + '/reports'
+REQ_END_TASK = REQ_GET_TASK
+REQ_END_DS_TASKS = REQ_END_TASK + '/shutdownAllTasks'
+
+class TaskClient:
+ """
+ Client for task-related APIs. The APIs connect through the Router to
+ the Overlord.
+ """
+
+ def __init__(self, rest_client):
+ self.client = rest_client
+
+ def tasks(self, state=None, table=None, type=None, max=None,
created_time_interval=None):
+ '''
+ Retrieve list of tasks.
+
+ Parameters
+ ----------
+ state : str, default = None
+ Filter list of tasks by task state. Valid options are
"running",
+ "complete", "waiting", and "pending". Constants are defined for
+ each of these in the `consts` file.
+ table : str, default = None
+ Return tasks filtered by Druid table (datasource).
+ created_time_interval : str, Default = None
+ Return tasks created within the specified interval.
+ max : int, default = None
+ Maximum number of "complete" tasks to return. Only applies when
state is set to "complete".
+ type : str, default = None
+ filter tasks by task type.
+
+ Reference
+ ---------
+ `GET /druid/indexer/v1/tasks`
+
+ See
https://druid.apache.org/docs/latest/operations/api-reference.html#get-15
+ '''
+ params = {}
+ if state is not None:
+ params['state'] = state
+ if table is not None:
+ params['datasource'] = table
+ if type is not None:
+ params['type'] = type
+ if max is not None:
+ params['max'] = max
+ if created_time_interval is not None:
+ params['createdTimeInterval'] = created_time_interval
+ return self.client.get_json(REQ_TASKS, params=params)
+
+ def task(self, task_id):
+ """
+ Retrieve the "payload" of a task.
+
+ Parameters
+ ----------
+ task_id : str
+ The id of the task to retrieve
+
+ Reference
+ ---------
+ `GET /druid/indexer/v1/task/{taskId}`
+
+ See
https://druid.apache.org/docs/latest/operations/api-reference.html#get-15
+ """
+ return self.client.get_json(REQ_GET_TASK, args=[task_id])
+
+ def task_status(self, task_id):
+ '''
+ Retrieve the status of a task.
+
+ Parameters
+ ----------
+ task_id : str
+ The id of the task to retrieve
+
+ Reference
+ ---------
+ `GET /druid/indexer/v1/task/{taskId}/status`
+
+ See
https://druid.apache.org/docs/latest/operations/api-reference.html#get-15
+ '''
+ return self.client.get_json(REQ_TASK_STATUS, args=[task_id])
+
+ def task_reports(self, task_id):
+ '''
+ Retrieve a task completion report for a task.
+ Only works for completed tasks.
Review Comment:
I'll have to check this one. It depends on what the server returns.
##########
examples/quickstart/jupyter-notebooks/druidapi/display.py:
##########
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+TEXT_TABLE = 0
+HTML_TABLE = 1
+
+class Display:
+
+ def __init__(self):
+ self.format = TEXT_TABLE
+ self.html_initialized = False
+
+ def text(self):
+ self.format = TEXT_TABLE
+
+ def html(self):
+ self.format = HTML_TABLE
+ if not self.html_initialized:
+ from .html_table import styles
+ styles()
+ self.html_initialized = True
+
+ def table(self):
+ if self.format == HTML_TABLE:
+ from .html_table import HtmlTable
Review Comment:
The key reason is that these imports depend on IPython, which is generally
only useful when running in a notebook. By doing the import here, we avoid the
need to import IPython if running in a Python script.
##########
docs/tutorials/tutorial-jupyter-index.md:
##########
@@ -22,51 +22,86 @@ title: "Jupyter Notebook tutorials"
~ under the License.
-->
-<!-- tutorial-jupyter-index.md and
examples/quickstart/juptyer-notebooks/README.md share a lot of the same
content. If you make a change in one place, update the other too. -->
+<!-- tutorial-jupyter-index.md and
examples/quickstart/juptyer-notebooks/README.md
+ share a lot of the same content. If you make a change in one place, update
the other
+ too. -->
-You can try out the Druid APIs using the Jupyter Notebook-based tutorials.
These tutorials provide snippets of Python code that you can use to run calls
against the Druid API to complete the tutorial.
+You can try out the Druid APIs using the Jupyter Notebook-based tutorials.
These
+tutorials provide snippets of Python code that you can use to run calls against
+the Druid API to complete the tutorial.
## Prerequisites
Make sure you meet the following requirements before starting the
Jupyter-based tutorials:
-- Python 3
+- Python 3
+
+- The `requests` package for Python. For example, you can install it with the
following command:
-- The `requests` package for Python. For example, you can install it with the
following command:
-
```bash
pip3 install requests
```
-- JupyterLab (recommended) or Jupyter Notebook running on a non-default port.
By default, Druid and Jupyter both try to use port `8888,` so start Jupyter on
a different port.
+- JupyterLab (recommended) or Jupyter Notebook running on a non-default port.
By default, Druid
+ and Jupyter both try to use port `8888`, so start Jupyter on a different
port.
- Install JupyterLab or Notebook:
-
- ```bash
- # Install JupyterLab
- pip3 install jupyterlab
- # Install Jupyter Notebook
- pip3 install notebook
- ```
- - Start JupyterLab
-
+
+ ```bash
+ # Install JupyterLab
+ pip3 install jupyterlab
+ # Install Jupyter Notebook
+ pip3 install notebook
+ ```
+ - Start Jupyter using either JupyterLab
+ ```bash
+ # Start JupyterLab on port 3001
+ jupyter lab --port 3001
+ ```
+
+ Or using Jupyter Notebook
```bash
- # Start JupyterLab on port 3001
- jupyter lab --port 3001
- ```
- - Alternatively, start Jupyter Notebook
- ```bash
- # Start Jupyter Notebook on port 3001
- jupyter notebook --port 3001
- ```
+ # Start Jupyter Notebook on port 3001
+ jupyter notebook --port 3001
+ ```
+
+- An available Druid instance. You can use the [Quickstart
(local)](./index.md) instance. The tutorials
+ assume that you are using the quickstart, so no authentication or
authorization
+ is expected unless explicitly mentioned.
+
+ If you contribute to Druid, and work with Druid integration tests, can use a
test cluster.
+ Assume you have an environment variable, `DRUID_DEV`, which identifies your
Druid source repo.
+
+ ```bash
+ cd $DRUID_DEV
+ ./it.sh build
+ ./it.sh image
+ ./it.sh up <category>
+ ```
+
+ Replace `<catagory>` with one of the available integration test categories.
See the integration
+ test `README.md` for details.
+
+## Simple Druid API
-- An available Druid instance. You can use the [Quickstart
(local)](./index.md) instance. The tutorials assume that you are using the
quickstart, so no authentication or authorization is expected unless explicitly
mentioned.
+One of the notebooks shows how to use the Druid REST API. The others focus on
other
+topics and use a simple set of Python wrappers around the underlying REST API.
The
+wrappers reside in the `druidapi` package within the notebooks directory.
While the package
Review Comment:
That's a good longer-term goal. For now, we're putting our toe in the water
by including the code here. From there, we can see if there is broader interest
besides just as a training tool.
##########
examples/quickstart/jupyter-notebooks/druidapi/sql.py:
##########
@@ -0,0 +1,690 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time, requests
+from . import consts, display
+from .consts import ROUTER_BASE
+from .util import is_blank, dict_get
+from .error import DruidError, ClientError
+
+REQ_ROUTER_QUERY = ROUTER_BASE
+REQ_ROUTER_SQL = ROUTER_BASE + '/sql'
+REQ_ROUTER_SQL_TASK = REQ_ROUTER_SQL + '/task'
+
+class SqlRequest:
+
+ def __init__(self, query_client, sql):
+ self.query_client = query_client
+ self.sql = sql
+ self.context = None
+ self.params = None
+ self.header = False
+ self.format = consts.SQL_OBJECT
+ self.headers = None
+ self.types = None
+ self.sqlTypes = None
+
+ def with_format(self, result_format):
+ self.format = result_format
+ return self
+
+ def with_headers(self, sqlTypes=False, druidTypes=False):
+ self.headers = True
+ self.types = druidTypes
+ self.sqlTypes = sqlTypes
+ return self
+
+ def with_context(self, context):
+ if self.context is None:
+ self.context = context
+ else:
+ self.context.update(context)
+ return self
+
+ def with_parameters(self, params):
+ '''
+ Set the array of parameters. Parameters must each be a map of
'type'/'value' pairs:
+ {'type': the_type, 'value': the_value}. The type must be a valid SQL
type
+ (in upper case). See the consts module for a list.
+ '''
+ if self.params is None:
+ self.params = params
+ else:
+ self.params.update(params)
+ return self
+
+ def add_parameter(self, value):
+ '''
+ Add one parameter value. Infers the type of the parameter from the
Python type.
+ '''
+ if value is None:
+ raise ClientError("Druid does not support null parameter values")
+ data_type = None
+ value_type = type(value)
+ if value_type is str:
+ data_type = consts.SQL_VARCHAR_TYPE
+ elif value_type is int:
+ data_type = consts.SQL_BIGINT_TYPE
+ elif value_type is float:
+ data_type = consts.SQL_DOUBLE_TYPE
+ elif value_type is list:
+ data_type = consts.SQL_ARRAY_TYPE
+ else:
+ raise ClientError("Unsupported value type")
+ if self.params is None:
+ self.params = []
+ self.params.append({'type': data_type, 'value': value})
Review Comment:
Yeah. Python convention is to use single quotes for constant-like things.
But, Java uses double-quotes. So, my muscle memory keeps typing "Java" instead
of "Python". Fixed.
##########
examples/quickstart/jupyter-notebooks/druidapi/rest.py:
##########
@@ -0,0 +1,180 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import requests
+from .util import dict_get, is_blank
+from urllib.parse import quote
+from .error import ClientError
+
+def check_error(response):
+ """
+ Raises a requests HttpError if the response code is not OK or Accepted.
+
+ If the response included a JSON payload, then the message is extracted
+ from that payload, else the message is from requests. The JSON
+ payload, if any, is returned in the json field of the error.
+ """
+ code = response.status_code
+ if code == requests.codes.ok or code == requests.codes.accepted:
+ return
+ error = None
+ json = None
+ try:
+ json = response.json()
+ except Exception:
+ # If we can't get the JSON, just move on, we'll figure
+ # things out another way.
+ pass
+ msg = dict_get(json, 'errorMessage')
+ if msg is None:
+ msg = dict_get(json, 'error')
+ if not is_blank(msg):
+ raise ClientError(msg)
+ if code == requests.codes.not_found and error is None:
+ error = "Not found"
+ if error is not None:
+ response.reason = error
+ try:
+ response.raise_for_status()
+ except Exception as e:
+ e.json = json
+ raise e
+
+class DruidRestClient:
+ '''
+ Wrapper around the basic Druid REST API operations using the
+ requests Python package. Handles the grunt work of building up
+ URLs, working with JSON, etc.
+ '''
+
+ def __init__(self, endpoint):
+ self.endpoint = endpoint
+ self.trace = False
+ self.session = requests.Session()
+
+ def enable_trace(self, flag=True):
+ self.trace = flag
+
+ def build_url(self, req, args=None) -> str:
+ """
+ Returns the full URL for a REST call given the relative request API and
+ optional parameters to fill placeholders within the request URL.
+
+ Parameters
+ ----------
+ req : str
+ relative URL, with optional {} placeholders
+
+ args : list
+ optional list of values to match {} placeholders
+ in the URL.
+ """
+ url = self.endpoint + req
+ if args is not None:
+ quoted = [quote(arg) for arg in args]
+ url = url.format(*quoted)
+ return url
+
+ def get(self, req, args=None, params=None, require_ok=True) ->
requests.Request:
+ '''
+ Generic GET request to this service.
Review Comment:
Added more text to the class description to clarify. The endpoint has a
"base" URL to represent "this" Druid service. All requests are relative to that
base URL.
##########
examples/quickstart/jupyter-notebooks/druidapi/datasource.py:
##########
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import requests, time
+from .consts import COORD_BASE
+from .rest import check_error
+from .util import dict_get
+
+REQ_DATASOURCES = COORD_BASE + '/datasources'
+REQ_DATASOURCE = REQ_DATASOURCES + '/{}'
+
+# Segment load status
+REQ_DATASOURCES = COORD_BASE + '/datasources'
+REQ_DS_LOAD_STATUS = REQ_DATASOURCES + '/{}/loadstatus'
+
+class DatasourceClient:
+ '''
+ Client for status APIs. These APIs are available on all nodes.
+ If used with the router, they report the status of just the router.
+ '''
+
+ def __init__(self, rest_client):
+ self.rest_client = rest_client
+
+ def names(self, include_unused=False, include_disabled=False):
+ """
+ Returns a list of the names of data sources in the metadata store.
+
+ Parameters
+ ----------
+ include_unused : bool, default = False
+ if False, returns only datasources with at least one used segment
+ in the cluster.
+
+ include_unused : bool, default = False
+ if False, returns only enamed datasources.
+
+ Reference
+ ---------
+ * `GET /druid/coordinator/v1/metadata/datasources`
+ * `GET /druid/coordinator/v1/metadata/datasources?includeUnused`
+ * `GET /druid/coordinator/v1/metadata/datasources?includeDisabled`
+
+ See
https://druid.apache.org/docs/latest/operations/api-reference.html#get-4
Review Comment:
Argh... These were created against a previous version of the docs. We
reformatted the page so it looks much better, but we lost the anchors. Fixed to
point to the section headings instead.
##########
examples/quickstart/jupyter-notebooks/druidapi/datasource.py:
##########
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import requests, time
+from .consts import COORD_BASE
+from .rest import check_error
+from .util import dict_get
+
+REQ_DATASOURCES = COORD_BASE + '/datasources'
+REQ_DATASOURCE = REQ_DATASOURCES + '/{}'
+
+# Segment load status
+REQ_DATASOURCES = COORD_BASE + '/datasources'
+REQ_DS_LOAD_STATUS = REQ_DATASOURCES + '/{}/loadstatus'
+
+class DatasourceClient:
+ '''
+ Client for status APIs. These APIs are available on all nodes.
+ If used with the router, they report the status of just the router.
+ '''
+
+ def __init__(self, rest_client):
+ self.rest_client = rest_client
+
+ def names(self, include_unused=False, include_disabled=False):
+ """
+ Returns a list of the names of data sources in the metadata store.
+
+ Parameters
+ ----------
+ include_unused : bool, default = False
+ if False, returns only datasources with at least one used segment
+ in the cluster.
+
+ include_unused : bool, default = False
+ if False, returns only enamed datasources.
Review Comment:
Yeah, on some of these I had to guess from the sparse API documentation. For
many, I just cribbed the existing documentation text.
##########
docs/tutorials/tutorial-jupyter-index.md:
##########
@@ -22,50 +22,85 @@ title: "Jupyter Notebook tutorials"
~ under the License.
-->
-<!-- tutorial-jupyter-index.md and
examples/quickstart/juptyer-notebooks/README.md share a lot of the same
content. If you make a change in one place, update the other too. -->
+<!-- tutorial-jupyter-index.md and
examples/quickstart/juptyer-notebooks/README.md
+ share a lot of the same content. If you make a change in one place, update
the other
+ too. -->
-You can try out the Druid APIs using the Jupyter Notebook-based tutorials.
These tutorials provide snippets of Python code that you can use to run calls
against the Druid API to complete the tutorial.
+You can try out the Druid APIs using the Jupyter Notebook-based tutorials.
These
+tutorials provide snippets of Python code that you can use to run calls against
+the Druid API to complete the tutorial.
## Prerequisites
Make sure you meet the following requirements before starting the
Jupyter-based tutorials:
-- Python 3
+- Python 3
+
+- The `requests` package for Python. For example, you can install it with the
following command:
-- The `requests` package for Python. For example, you can install it with the
following command:
-
```bash
pip3 install requests
```
-- JupyterLab (recommended) or Jupyter Notebook running on a non-default port.
By default, Druid and Jupyter both try to use port `8888,` so start Jupyter on
a different port.
+- JupyterLab (recommended) or Jupyter Notebook running on a non-default port.
By default, Druid
+ and Jupyter both try to use port `8888`, so start Jupyter on a different
port.
- Install JupyterLab or Notebook:
-
- ```bash
- # Install JupyterLab
- pip3 install jupyterlab
- # Install Jupyter Notebook
- pip3 install notebook
- ```
- - Start Jupyter
- - JupyterLab
+
+ ```bash
+ # Install JupyterLab
+ pip3 install jupyterlab
Review Comment:
Great idea!
[Tutorial](https://learnpython.com/blog/python-requirements-file/).
[Reference](https://pip.pypa.io/en/stable/reference/requirements-file-format/).
This will be particularly useful for the coming-soon visualization notebooks
that will have non-trivial dependencies.
##########
examples/quickstart/jupyter-notebooks/druidapi/sql.py:
##########
@@ -0,0 +1,690 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time, requests
+from . import consts, display
+from .consts import ROUTER_BASE
+from .util import is_blank, dict_get
+from .error import DruidError, ClientError
+
+REQ_ROUTER_QUERY = ROUTER_BASE
+REQ_ROUTER_SQL = ROUTER_BASE + '/sql'
+REQ_ROUTER_SQL_TASK = REQ_ROUTER_SQL + '/task'
+
+class SqlRequest:
+
+ def __init__(self, query_client, sql):
+ self.query_client = query_client
+ self.sql = sql
+ self.context = None
+ self.params = None
+ self.header = False
+ self.format = consts.SQL_OBJECT
+ self.headers = None
+ self.types = None
+ self.sqlTypes = None
+
+ def with_format(self, result_format):
+ self.format = result_format
+ return self
+
+ def with_headers(self, sqlTypes=False, druidTypes=False):
+ self.headers = True
+ self.types = druidTypes
+ self.sqlTypes = sqlTypes
+ return self
+
+ def with_context(self, context):
+ if self.context is None:
+ self.context = context
+ else:
+ self.context.update(context)
+ return self
+
+ def with_parameters(self, params):
+ '''
+ Set the array of parameters. Parameters must each be a map of
'type'/'value' pairs:
+ {'type': the_type, 'value': the_value}. The type must be a valid SQL
type
+ (in upper case). See the consts module for a list.
+ '''
+ if self.params is None:
+ self.params = params
+ else:
+ self.params.update(params)
+ return self
+
+ def add_parameter(self, value):
+ '''
+ Add one parameter value. Infers the type of the parameter from the
Python type.
+ '''
+ if value is None:
+ raise ClientError("Druid does not support null parameter values")
+ data_type = None
+ value_type = type(value)
+ if value_type is str:
+ data_type = consts.SQL_VARCHAR_TYPE
+ elif value_type is int:
+ data_type = consts.SQL_BIGINT_TYPE
+ elif value_type is float:
+ data_type = consts.SQL_DOUBLE_TYPE
+ elif value_type is list:
+ data_type = consts.SQL_ARRAY_TYPE
+ else:
+ raise ClientError("Unsupported value type")
+ if self.params is None:
+ self.params = []
+ self.params.append({'type': data_type, 'value': value})
+
+ def response_header(self):
+ self.header = True
+ return self
+
+ def request_headers(self, headers):
+ self.headers = headers
+ return self
+
+ def to_request(self):
+ query_obj = {"query": self.sql}
+ if self.context is not None and len(self.context) > 0:
+ query_obj['context'] = self.context
+ if self.params is not None and len(self.params) > 0:
+ query_obj['parameters'] = self.params
+ if self.header:
+ query_obj['header'] = True
+ if self.result_format is not None:
+ query_obj['resultFormat'] = self.format
+ if self.sqlTypes:
+ query_obj['sqlTypesHeader'] = self.sqlTypes
+ if self.types:
+ query_obj['typesHeader'] = self.types
+ return query_obj
+
+ def result_format(self):
+ return self.format.lower()
+
+ def run(self):
+ return self.query_client.sql_query(self)
+
+def parse_rows(fmt, context, results):
+ if fmt == consts.SQL_ARRAY_WITH_TRAILER:
+ rows = results['results']
+ elif fmt == consts.SQL_ARRAY:
+ rows = results
+ else:
+ return results
+ if not context.get('headers', False):
+ return rows
+ header_size = 1
+ if context.get('sqlTypesHeader', False):
+ header_size += 1
+ if context.get('typesHeader', False):
+ header_size += 1
+ return rows[header_size:]
+
+def label_non_null_cols(results):
+ if results is None or len(results) == 0:
+ return []
+ is_null = {}
+ for key in results[0].keys():
+ is_null[key] = True
+ for row in results:
+ for key, value in row.items():
+ if type(value) == str:
+ if value != '':
+ is_null[key] = False
+ elif type(value) == float:
+ if value != 0.0:
+ is_null[key] = False
+ elif value is not None:
+ is_null[key] = False
+ return is_null
+
+def filter_null_cols(results):
+ '''
+ Filter columns from a Druid result set by removing all null-like
+ columns. A column is considered null if all values for that column
+ are null. A value is null if it is either a JSON null, an empty
+ string, or a numeric 0. All rows are preserved, as is the order
+ of the remaining columns.
+ '''
+ if results is None or len(results) == 0:
+ return results
+ is_null = label_non_null_cols(results)
+ revised = []
+ for row in results:
+ new_row = {}
+ for key, value in row.items():
+ if is_null[key]:
+ continue
+ new_row[key] = value
+ revised.append(new_row)
+ return revised
+
+def parse_object_schema(results):
+ schema = []
+ if len(results) == 0:
+ return schema
+ row = results[0]
+ for k, v in row.items():
+ druid_type = None
+ sql_type = None
+ if type(v) is str:
+ druid_type = consts.DRUID_STRING_TYPE
+ sql_type = consts.SQL_VARCHAR_TYPE
+ elif type(v) is int or type(v) is float:
+ druid_type = consts.DRUID_LONG_TYPE
+ sql_type = consts.SQL_BIGINT_TYPE
+ schema.append(ColumnSchema(k, sql_type, druid_type))
+ return schema
+
+def parse_array_schema(context, results):
+ schema = []
+ if len(results) == 0:
+ return schema
+ has_headers = context.get(consts.HEADERS_KEY, False)
+ if not has_headers:
+ return schema
+ has_sql_types = context.get(consts.SQL_TYPES_HEADERS_KEY, False)
+ has_druid_types = context.get(consts.DRUID_TYPE_HEADERS_KEY, False)
+ size = len(results[0])
+ for i in range(size):
+ druid_type = None
+ if has_druid_types:
+ druid_type = results[1][i]
+ sql_type = None
+ if has_sql_types:
+ sql_type = results[2][i]
+ schema.append(ColumnSchema(results[0][i], sql_type, druid_type))
+ return schema
+
+def parse_schema(fmt, context, results):
+ if fmt == consts.SQL_OBJECT:
+ return parse_object_schema(results)
+ elif fmt == consts.SQL_ARRAY or fmt == consts.SQL_ARRAY_WITH_TRAILER:
+ return parse_array_schema(context, results)
+ else:
+ return []
+
+def is_response_ok(http_response):
+ code = http_response.status_code
+ return code == requests.codes.ok or code == requests.codes.accepted
+
+class ColumnSchema:
+
+ def __init__(self, name, sql_type, druid_type):
+ self.name = name
+ self.sql_type = sql_type
+ self.druid_type = druid_type
+
+ def __str__(self):
+ return "{{name={}, SQL type={}, Druid type={}}}".format(self.name,
self.sql_type, self.druid_type)
+
+class SqlQueryResult:
+ """
+ Defines the core protocol for Druid SQL queries.
+ """
+
+ def __init__(self, request, response):
+ self.http_response = response
+ self._json = None
+ self._rows = None
+ self._schema = None
+ self.request = request
+ self._error = None
+ self._id = None
+ if not is_response_ok(response):
+ try:
+ self._error = response.json()
+ except Exception:
+ self._error = response.text
+ if self._error is None or len(self._error) == 0:
+ self._error = "Failed with HTTP status
{}".format(response.status_code)
+ try:
+ self._id = self.http_response.headers['X-Druid-SQL-Query-Id']
+ except KeyError:
+ self._error = "Query returned no query ID"
+
+ def result_format(self):
+ return self.request.result_format()
+
+ def ok(self):
+ """
+ Reports if the query succeeded.
+
+ The query rows and schema are available only if ok() returns True.
+ """
+ return is_response_ok(self.http_response)
+
+ def error_msg(self):
+ err = self.error()
+ if err is None:
+ return "unknown"
+ if type(err) is str:
+ return err
+ msg = err.get("error")
+ text = err.get("errorMessage")
+ if msg is None and text is None:
+ return "unknown"
+ if msg is None:
+ return text
+ if text is None:
+ return msg
+ return msg + ": " + text
+
+ def id(self):
+ """
+ Returns the unique identifier for the query.
+ """
+ return self._id
+
+ def non_null(self):
+ if not self.ok():
+ return None
+ if self.result_format() != consts.SQL_OBJECT:
+ return None
+ return filter_null_cols(self.rows())
+
+ def as_array(self):
+ if self.result_format() == consts.SQL_OBJECT:
+ rows = []
+ for obj in self.rows():
+ rows.append([v for v in obj.values()])
+ return rows
+ else:
+ return self.rows()
+
+ def error(self):
+ """
+ If the query fails, returns the error, if any provided by Druid.
+ """
+ if self.ok():
+ return None
+ if self._error is not None:
+ return self._error
+ if self.http_response is None:
+ return { "error": "unknown"}
+ if is_response_ok(self.http_response):
+ return None
+ return {"error": "HTTP {}".format(self.http_response.status_code)}
+
+ def json(self):
+ if not self.ok():
+ return None
+ if self._json is None:
+ self._json = self.http_response.json()
+ return self._json
+
+ def rows(self):
+ """
+ Returns the rows of data for the query.
+
+ Druid supports many data formats. The method makes its best
+ attempt to map the format into an array of rows of some sort.
+ """
+ if self._rows is None:
+ json = self.json()
+ if json is None:
+ return self.http_response.text
+ self._rows = parse_rows(self.result_format(),
self.request.context, json)
+ return self._rows
+
+ def schema(self):
+ """
+ Returns the data schema as a list of ColumnSchema objects.
+
+ Druid supports many data formats, not all of them provide
+ schema information. This method makes its best attempt to
Review Comment:
Thanks. This is another whacky bit of the present query API. The returned
data format varies depending on options. In order for the Python API to make
sense of the data, it needs a certain set of options. To get, say, a CSV file,
then we get data in a form that the rest of the API can't understand.
I think, later, I'll split the Python API to have one way to get data for
consumption in Python, another to get data for download as CSV, etc. That
should simplify the row-parsing code and make it clearer to the user when
certain features are available.
Insert another grumble about not creating a client in parallel with creating
the wild & crazy `/sql` API.
##########
examples/quickstart/jupyter-notebooks/druidapi/sql.py:
##########
@@ -0,0 +1,690 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time, requests
+from . import consts, display
+from .consts import ROUTER_BASE
+from .util import is_blank, dict_get
+from .error import DruidError, ClientError
+
+REQ_ROUTER_QUERY = ROUTER_BASE
+REQ_ROUTER_SQL = ROUTER_BASE + '/sql'
+REQ_ROUTER_SQL_TASK = REQ_ROUTER_SQL + '/task'
+
+class SqlRequest:
+
+ def __init__(self, query_client, sql):
+ self.query_client = query_client
+ self.sql = sql
+ self.context = None
+ self.params = None
+ self.header = False
+ self.format = consts.SQL_OBJECT
+ self.headers = None
+ self.types = None
+ self.sqlTypes = None
+
+ def with_format(self, result_format):
+ self.format = result_format
+ return self
+
+ def with_headers(self, sqlTypes=False, druidTypes=False):
+ self.headers = True
+ self.types = druidTypes
+ self.sqlTypes = sqlTypes
+ return self
+
+ def with_context(self, context):
+ if self.context is None:
+ self.context = context
+ else:
+ self.context.update(context)
+ return self
+
+ def with_parameters(self, params):
+ '''
+ Set the array of parameters. Parameters must each be a map of
'type'/'value' pairs:
+ {'type': the_type, 'value': the_value}. The type must be a valid SQL
type
+ (in upper case). See the consts module for a list.
+ '''
+ if self.params is None:
+ self.params = params
+ else:
+ self.params.update(params)
+ return self
+
+ def add_parameter(self, value):
+ '''
+ Add one parameter value. Infers the type of the parameter from the
Python type.
+ '''
+ if value is None:
+ raise ClientError("Druid does not support null parameter values")
+ data_type = None
+ value_type = type(value)
+ if value_type is str:
+ data_type = consts.SQL_VARCHAR_TYPE
+ elif value_type is int:
+ data_type = consts.SQL_BIGINT_TYPE
+ elif value_type is float:
+ data_type = consts.SQL_DOUBLE_TYPE
+ elif value_type is list:
+ data_type = consts.SQL_ARRAY_TYPE
+ else:
+ raise ClientError("Unsupported value type")
+ if self.params is None:
+ self.params = []
+ self.params.append({'type': data_type, 'value': value})
+
+ def response_header(self):
+ self.header = True
+ return self
+
+ def request_headers(self, headers):
+ self.headers = headers
+ return self
+
+ def to_request(self):
+ query_obj = {"query": self.sql}
+ if self.context is not None and len(self.context) > 0:
+ query_obj['context'] = self.context
+ if self.params is not None and len(self.params) > 0:
+ query_obj['parameters'] = self.params
+ if self.header:
+ query_obj['header'] = True
+ if self.result_format is not None:
+ query_obj['resultFormat'] = self.format
+ if self.sqlTypes:
+ query_obj['sqlTypesHeader'] = self.sqlTypes
+ if self.types:
+ query_obj['typesHeader'] = self.types
+ return query_obj
+
+ def result_format(self):
+ return self.format.lower()
+
+ def run(self):
+ return self.query_client.sql_query(self)
+
+def parse_rows(fmt, context, results):
+ if fmt == consts.SQL_ARRAY_WITH_TRAILER:
+ rows = results['results']
+ elif fmt == consts.SQL_ARRAY:
+ rows = results
+ else:
+ return results
+ if not context.get('headers', False):
+ return rows
+ header_size = 1
+ if context.get('sqlTypesHeader', False):
+ header_size += 1
+ if context.get('typesHeader', False):
+ header_size += 1
+ return rows[header_size:]
+
+def label_non_null_cols(results):
+ if results is None or len(results) == 0:
+ return []
+ is_null = {}
+ for key in results[0].keys():
+ is_null[key] = True
+ for row in results:
+ for key, value in row.items():
+ if type(value) == str:
+ if value != '':
+ is_null[key] = False
+ elif type(value) == float:
+ if value != 0.0:
+ is_null[key] = False
+ elif value is not None:
+ is_null[key] = False
Review Comment:
This is actually a place where the flexibility of `not` helps. So,
`is_null[key] = not not value`.
##########
examples/quickstart/jupyter-notebooks/druidapi/sql.py:
##########
@@ -0,0 +1,690 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time, requests
+from . import consts, display
+from .consts import ROUTER_BASE
+from .util import is_blank, dict_get
+from .error import DruidError, ClientError
+
+REQ_ROUTER_QUERY = ROUTER_BASE
+REQ_ROUTER_SQL = ROUTER_BASE + '/sql'
+REQ_ROUTER_SQL_TASK = REQ_ROUTER_SQL + '/task'
+
+class SqlRequest:
+
+ def __init__(self, query_client, sql):
+ self.query_client = query_client
+ self.sql = sql
+ self.context = None
+ self.params = None
+ self.header = False
+ self.format = consts.SQL_OBJECT
+ self.headers = None
+ self.types = None
+ self.sqlTypes = None
+
+ def with_format(self, result_format):
+ self.format = result_format
+ return self
+
+ def with_headers(self, sqlTypes=False, druidTypes=False):
+ self.headers = True
+ self.types = druidTypes
+ self.sqlTypes = sqlTypes
+ return self
+
+ def with_context(self, context):
+ if self.context is None:
+ self.context = context
+ else:
+ self.context.update(context)
+ return self
+
+ def with_parameters(self, params):
+ '''
+ Set the array of parameters. Parameters must each be a map of
'type'/'value' pairs:
+ {'type': the_type, 'value': the_value}. The type must be a valid SQL
type
+ (in upper case). See the consts module for a list.
+ '''
+ if self.params is None:
+ self.params = params
+ else:
+ self.params.update(params)
+ return self
+
+ def add_parameter(self, value):
+ '''
+ Add one parameter value. Infers the type of the parameter from the
Python type.
+ '''
+ if value is None:
+ raise ClientError("Druid does not support null parameter values")
+ data_type = None
+ value_type = type(value)
+ if value_type is str:
+ data_type = consts.SQL_VARCHAR_TYPE
+ elif value_type is int:
+ data_type = consts.SQL_BIGINT_TYPE
+ elif value_type is float:
+ data_type = consts.SQL_DOUBLE_TYPE
+ elif value_type is list:
+ data_type = consts.SQL_ARRAY_TYPE
+ else:
+ raise ClientError("Unsupported value type")
+ if self.params is None:
+ self.params = []
+ self.params.append({'type': data_type, 'value': value})
+
+ def response_header(self):
+ self.header = True
+ return self
+
+ def request_headers(self, headers):
+ self.headers = headers
+ return self
+
+ def to_request(self):
+ query_obj = {"query": self.sql}
+ if self.context is not None and len(self.context) > 0:
+ query_obj['context'] = self.context
+ if self.params is not None and len(self.params) > 0:
+ query_obj['parameters'] = self.params
+ if self.header:
+ query_obj['header'] = True
+ if self.result_format is not None:
+ query_obj['resultFormat'] = self.format
+ if self.sqlTypes:
+ query_obj['sqlTypesHeader'] = self.sqlTypes
+ if self.types:
+ query_obj['typesHeader'] = self.types
+ return query_obj
+
+ def result_format(self):
+ return self.format.lower()
+
+ def run(self):
+ return self.query_client.sql_query(self)
+
+def parse_rows(fmt, context, results):
+ if fmt == consts.SQL_ARRAY_WITH_TRAILER:
+ rows = results['results']
+ elif fmt == consts.SQL_ARRAY:
+ rows = results
+ else:
+ return results
+ if not context.get('headers', False):
+ return rows
+ header_size = 1
+ if context.get('sqlTypesHeader', False):
+ header_size += 1
+ if context.get('typesHeader', False):
+ header_size += 1
+ return rows[header_size:]
+
+def label_non_null_cols(results):
+ if results is None or len(results) == 0:
+ return []
+ is_null = {}
+ for key in results[0].keys():
+ is_null[key] = True
+ for row in results:
+ for key, value in row.items():
+ if type(value) == str:
+ if value != '':
+ is_null[key] = False
+ elif type(value) == float:
+ if value != 0.0:
+ is_null[key] = False
+ elif value is not None:
+ is_null[key] = False
+ return is_null
+
+def filter_null_cols(results):
+ '''
+ Filter columns from a Druid result set by removing all null-like
+ columns. A column is considered null if all values for that column
+ are null. A value is null if it is either a JSON null, an empty
+ string, or a numeric 0. All rows are preserved, as is the order
+ of the remaining columns.
+ '''
+ if results is None or len(results) == 0:
+ return results
+ is_null = label_non_null_cols(results)
+ revised = []
+ for row in results:
+ new_row = {}
+ for key, value in row.items():
+ if is_null[key]:
+ continue
+ new_row[key] = value
+ revised.append(new_row)
+ return revised
+
+def parse_object_schema(results):
+ schema = []
+ if len(results) == 0:
+ return schema
+ row = results[0]
+ for k, v in row.items():
+ druid_type = None
+ sql_type = None
+ if type(v) is str:
+ druid_type = consts.DRUID_STRING_TYPE
+ sql_type = consts.SQL_VARCHAR_TYPE
+ elif type(v) is int or type(v) is float:
+ druid_type = consts.DRUID_LONG_TYPE
+ sql_type = consts.SQL_BIGINT_TYPE
+ schema.append(ColumnSchema(k, sql_type, druid_type))
+ return schema
+
+def parse_array_schema(context, results):
+ schema = []
+ if len(results) == 0:
+ return schema
+ has_headers = context.get(consts.HEADERS_KEY, False)
+ if not has_headers:
+ return schema
+ has_sql_types = context.get(consts.SQL_TYPES_HEADERS_KEY, False)
+ has_druid_types = context.get(consts.DRUID_TYPE_HEADERS_KEY, False)
+ size = len(results[0])
+ for i in range(size):
+ druid_type = None
+ if has_druid_types:
+ druid_type = results[1][i]
+ sql_type = None
+ if has_sql_types:
+ sql_type = results[2][i]
+ schema.append(ColumnSchema(results[0][i], sql_type, druid_type))
+ return schema
+
+def parse_schema(fmt, context, results):
+ if fmt == consts.SQL_OBJECT:
+ return parse_object_schema(results)
+ elif fmt == consts.SQL_ARRAY or fmt == consts.SQL_ARRAY_WITH_TRAILER:
+ return parse_array_schema(context, results)
+ else:
+ return []
+
+def is_response_ok(http_response):
+ code = http_response.status_code
+ return code == requests.codes.ok or code == requests.codes.accepted
+
+class ColumnSchema:
+
+ def __init__(self, name, sql_type, druid_type):
+ self.name = name
+ self.sql_type = sql_type
+ self.druid_type = druid_type
+
+ def __str__(self):
+ return "{{name={}, SQL type={}, Druid type={}}}".format(self.name,
self.sql_type, self.druid_type)
+
+class SqlQueryResult:
+ """
+ Defines the core protocol for Druid SQL queries.
+ """
+
+ def __init__(self, request, response):
+ self.http_response = response
+ self._json = None
+ self._rows = None
+ self._schema = None
+ self.request = request
+ self._error = None
+ self._id = None
+ if not is_response_ok(response):
+ try:
+ self._error = response.json()
+ except Exception:
+ self._error = response.text
+ if self._error is None or len(self._error) == 0:
+ self._error = "Failed with HTTP status
{}".format(response.status_code)
+ try:
+ self._id = self.http_response.headers['X-Druid-SQL-Query-Id']
+ except KeyError:
+ self._error = "Query returned no query ID"
+
+ def result_format(self):
+ return self.request.result_format()
+
+ def ok(self):
+ """
+ Reports if the query succeeded.
+
+ The query rows and schema are available only if ok() returns True.
+ """
+ return is_response_ok(self.http_response)
+
+ def error_msg(self):
+ err = self.error()
+ if err is None:
+ return "unknown"
+ if type(err) is str:
+ return err
+ msg = err.get("error")
+ text = err.get("errorMessage")
+ if msg is None and text is None:
+ return "unknown"
+ if msg is None:
+ return text
+ if text is None:
+ return msg
+ return msg + ": " + text
+
+ def id(self):
+ """
+ Returns the unique identifier for the query.
+ """
+ return self._id
+
+ def non_null(self):
+ if not self.ok():
+ return None
+ if self.result_format() != consts.SQL_OBJECT:
+ return None
+ return filter_null_cols(self.rows())
+
+ def as_array(self):
+ if self.result_format() == consts.SQL_OBJECT:
+ rows = []
+ for obj in self.rows():
+ rows.append([v for v in obj.values()])
+ return rows
+ else:
+ return self.rows()
+
+ def error(self):
+ """
+ If the query fails, returns the error, if any provided by Druid.
+ """
+ if self.ok():
+ return None
+ if self._error is not None:
+ return self._error
+ if self.http_response is None:
+ return { "error": "unknown"}
+ if is_response_ok(self.http_response):
+ return None
+ return {"error": "HTTP {}".format(self.http_response.status_code)}
+
+ def json(self):
+ if not self.ok():
+ return None
+ if self._json is None:
+ self._json = self.http_response.json()
+ return self._json
+
+ def rows(self):
+ """
+ Returns the rows of data for the query.
+
+ Druid supports many data formats. The method makes its best
+ attempt to map the format into an array of rows of some sort.
+ """
+ if self._rows is None:
+ json = self.json()
+ if json is None:
+ return self.http_response.text
+ self._rows = parse_rows(self.result_format(),
self.request.context, json)
+ return self._rows
+
+ def schema(self):
+ """
+ Returns the data schema as a list of ColumnSchema objects.
+
+ Druid supports many data formats, not all of them provide
+ schema information. This method makes its best attempt to
+ extract the schema from the query results.
+ """
+ if self._schema is None:
+ self._schema = parse_schema(self.result_format(),
self.request.context, self.json())
+ return self._schema
+
+ def show(self, non_null=False):
+ data = None
+ if non_null:
+ data = self.non_null()
+ if data is None:
+ data = self.as_array()
+ if data is None or len(data) == 0:
+ display.display.show_message("Query returned no results")
+ return
+ disp = display.display.table()
+ disp.headers([c.name for c in self.schema()])
+ disp.show(data)
+
+ def show_schema(self):
+ disp = display.display.table()
+ disp.headers(['Name', 'SQL Type', 'Druid Type'])
+ data = []
+ for c in self.schema():
+ data.append([c.name, c.sql_type, c.druid_type])
+ disp.show(data)
+
+class QueryTaskResult:
+
+ def __init__(self, request, response):
+ self._request = request
+ self.http_response = response
+ self._status = None
+ self._results = None
+ self._details = None
+ self._schema = None
+ self._rows = None
+ self._reports = None
+ self._schema = None
+ self._results = None
+ self._error = None
+ self._id = None
+ if not is_response_ok(response):
+ self._state = consts.FAILED_STATE
+ try:
+ self._error = response.json()
+ except Exception:
+ self._error = response.text
+ if self._error is None or len(self._error) == 0:
+ self._error = "Failed with HTTP status
{}".format(response.status_code)
+ return
+
+ # Typical response:
+ # {'taskId':
'6f7b514a446d4edc9d26a24d4bd03ade_fd8e242b-7d93-431d-b65b-2a512116924c_bjdlojgj',
+ # 'state': 'RUNNING'}
+ self.response_obj = response.json()
+ self._id = self.response_obj['taskId']
+ self._state = self.response_obj['state']
+
+ def ok(self):
+ """
+ Reports if the query succeeded.
+
+ The query rows and schema are available only if ok() returns True.
+ """
+ return self._error is None
+
+ def id(self):
+ return self._id
+
+ def _tasks(self):
+ return self._request.query_client.druid_client.tasks()
+
+ def status(self):
+ """
+ Polls Druid for an update on the query run status.
+ """
+ self.check_valid()
+ # Example:
+ # {'task': 'talaria-sql-w000-b373b68d-2675-4035-b4d2-7a9228edead6',
+ # 'status': {
+ # 'id': 'talaria-sql-w000-b373b68d-2675-4035-b4d2-7a9228edead6',
+ # 'groupId': 'talaria-sql-w000-b373b68d-2675-4035-b4d2-7a9228edead6',
+ # 'type': 'talaria0', 'createdTime': '2022-04-28T23:19:50.331Z',
+ # 'queueInsertionTime': '1970-01-01T00:00:00.000Z',
+ # 'statusCode': 'RUNNING', 'status': 'RUNNING', 'runnerStatusCode':
'PENDING',
+ # 'duration': -1, 'location': {'host': None, 'port': -1, 'tlsPort':
-1},
+ # 'dataSource': 'w000', 'errorMsg': None}}
+ self._status = self._tasks().task_status(self._id)
+ self._state = self._status['status']['status']
+ if self._state == consts.FAILED_STATE:
+ self._error = self._status['status']['errorMsg']
+ return self._status
+
+ def done(self):
+ """
+ Reports if the query is done: succeeded or failed.
+ """
+ return self._state == consts.FAILED_STATE or self._state ==
consts.SUCCESS_STATE
+
+ def succeeded(self):
+ """
+ Reports if the query succeeded.
+ """
+ return self._state == consts.SUCCESS_STATE
+
+ def state(self):
+ """
+ Reports the engine-specific query state.
+
+ Updated after each call to status().
+ """
+ return self._state
+
+ def error(self):
+ return self._error
+
+ def error_msg(self):
+ err = self.error()
+ if err is None:
+ return "unknown"
+ if type(err) is str:
+ return err
+ msg = dict_get(err, "error")
+ text = dict_get(err, "errorMessage")
+ if msg is None and text is None:
+ return "unknown"
+ if text is not None:
+ text = text.replace('\\n', '\n')
+ if msg is None:
+ return text
+ if text is None:
+ return msg
+ return msg + ": " + text
+
+ def join(self):
+ if not self.done():
+ self.status()
+ while not self.done():
+ time.sleep(0.5)
+ self.status()
+ return self.succeeded()
+
+ def check_valid(self):
+ if self._id is None:
+ raise ClientError("Operation is invalid on a failed query")
+
+ def wait_done(self):
+ if not self.join():
+ raise DruidError("Query failed: " + self.error_msg())
+
+ def wait(self):
+ self.wait_done()
+ return self.rows()
+
+ def reports(self) -> dict:
+ self.check_valid()
+ if self._reports is None:
+ self.join()
+ self._reports = self._tasks().task_reports(self._id)
+ return self._reports
+
+ def results(self):
+ if self._results is None:
+ rpts = self.reports()
+ self._results = rpts['multiStageQuery']['payload']['results']
+ return self._results
+
+ def schema(self):
+ if self._schema is None:
+ results = self.results()
+ sig = results['signature']
+ sqlTypes = results['sqlTypeNames']
+ size = len(sig)
+ self._schema = []
+ for i in range(size):
+ self._schema.append(ColumnSchema(sig[i]['name'], sqlTypes[i],
sig[i]['type']))
+ return self._schema
+
+ def rows(self):
+ if self._rows is None:
+ results = self.results()
+ self._rows = results['results']
+ return self._rows
+
+ def show(self, non_null=False):
+ data = self.rows()
+ if non_null:
+ data = filter_null_cols(data)
+ disp = display.display.table()
+ disp.headers([c.name for c in self.schema()])
+ disp.show(data)
+
+class QueryClient:
+
+ def __init__(self, druid, rest_client=None):
+ self.druid_client = druid
+ self._rest_client = druid.rest_client if rest_client is None else
rest_client
+
+ def rest_client(self):
+ return self._rest_client
+
+ def _prepare_query(self, request):
+ if request is None:
+ raise ClientError("No query provided.")
+ if type(request) == str:
+ request = self.sql_request(request)
+ if is_blank(request.sql):
+ raise ClientError("No query provided.")
+ if self.rest_client().trace:
+ print(request.sql)
+ query_obj = request.to_request()
+ return (request, query_obj)
+
+ def sql_query(self, request) -> SqlQueryResult:
+ '''
+ Submit a SQL query with control over the context, parameters and other
+ options. Returns a response with either a detailed error message, or
+ the rows and query ID.
Review Comment:
Reworded. Again, it is messy because rows are available in Python format
only for some data formats. Otherwise, the raw result has to be parsed as CSV
or whatever.
##########
docs/tutorials/tutorial-jupyter-index.md:
##########
@@ -22,51 +22,86 @@ title: "Jupyter Notebook tutorials"
~ under the License.
-->
-<!-- tutorial-jupyter-index.md and
examples/quickstart/juptyer-notebooks/README.md share a lot of the same
content. If you make a change in one place, update the other too. -->
+<!-- tutorial-jupyter-index.md and
examples/quickstart/juptyer-notebooks/README.md
+ share a lot of the same content. If you make a change in one place, update
the other
+ too. -->
-You can try out the Druid APIs using the Jupyter Notebook-based tutorials.
These tutorials provide snippets of Python code that you can use to run calls
against the Druid API to complete the tutorial.
+You can try out the Druid APIs using the Jupyter Notebook-based tutorials.
These
+tutorials provide snippets of Python code that you can use to run calls against
+the Druid API to complete the tutorial.
## Prerequisites
Make sure you meet the following requirements before starting the
Jupyter-based tutorials:
-- Python 3
+- Python 3
Review Comment:
Adding a version means we've tested at that version. I'm using 3.9. Let's go
with 3.7 for now until we do the actual testing.
##########
examples/quickstart/jupyter-notebooks/Python_API_Tutorial.ipynb:
##########
@@ -0,0 +1,1281 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "id": "ce2efaaa",
+ "metadata": {},
+ "source": [
+ "# Learn the Druid Python API\n",
+ "\n",
+ "<!--\n",
+ " ~ Licensed to the Apache Software Foundation (ASF) under one\n",
+ " ~ or more contributor license agreements. See the NOTICE file\n",
+ " ~ distributed with this work for additional information\n",
+ " ~ regarding copyright ownership. The ASF licenses this file\n",
+ " ~ to you under the Apache License, Version 2.0 (the\n",
+ " ~ \"License\"); you may not use this file except in compliance\n",
+ " ~ with the License. You may obtain a copy of the License at\n",
+ " ~\n",
+ " ~ http://www.apache.org/licenses/LICENSE-2.0\n",
+ " ~\n",
+ " ~ Unless required by applicable law or agreed to in writing,\n",
+ " ~ software distributed under the License is distributed on an\n",
+ " ~ \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ " ~ KIND, either express or implied. See the License for the\n",
+ " ~ specific language governing permissions and limitations\n",
+ " ~ under the License.\n",
+ " -->\n",
+ "\n",
+ "This notebook provides a quick introduction to the Python wrapper around
the [Druid REST API](api-tutorial.ipynb). This notebook assumes you are
familiar with the basics of the REST API, and the [set of operations which
Druid
provides](https://druid.apache.org/docs/latest/operations/api-reference.html).
This tutorial focuses on using Python to access those APIs rather than
explaining the APIs themselves. The APIs themselves are covered in other
notebooks that use the Python API.\n",
+ "\n",
+ "The Druid Python API is primarily intended to help with these notebook
tutorials. It can also be used in your own ad-hoc notebooks, or in a regular
Python program.\n",
+ "\n",
+ "The Druid Python API is a work in progress. The Druid team adds API
wrappers as needed for the notebook tutorials. If you find you need additional
wrappers, please feel free to add them, and post a PR to Apache Druid with your
additions.\n",
+ "\n",
+ "The API provides two levels of functions. Most are simple wrappers around
Druid's REST APIs. Others add additional code to make the API easier to use.
The SQL query interface is a prime example: extra code translates a simple SQL
query into Druid's `SQLQuery` object and interprets the results into a form
that can be displayed in a notebook.\n",
+ "\n",
+ "Start by importing the `druidapi` package from the same folder as this
notebook. The `styles()` calls adds some CSS styles needed to display results."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "id": "6d90ca5d",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "<style>\n",
+ " .druid table {\n",
+ " border: 1px solid black;\n",
+ " border-collapse: collapse;\n",
+ " }\n",
+ "\n",
+ " .druid th, .druid td {\n",
+ " padding: 4px 1em ;\n",
+ " text-align: left;\n",
+ " }\n",
+ "\n",
+ " td.druid-right, th.druid-right {\n",
+ " text-align: right;\n",
+ " }\n",
+ "\n",
+ " td.druid-center, th.druid-center {\n",
+ " text-align: center;\n",
+ " }\n",
+ "\n",
+ " .druid .druid-left {\n",
+ " text-align: left;\n",
+ " }\n",
+ "\n",
+ " .druid-alert {\n",
+ " color: red;\n",
+ " }\n",
+ "</style>\n"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "<style>\n",
+ " .druid table {\n",
+ " border: 1px solid black;\n",
+ " border-collapse: collapse;\n",
+ " }\n",
+ "\n",
+ " .druid th, .druid td {\n",
+ " padding: 4px 1em ;\n",
+ " text-align: left;\n",
+ " }\n",
+ "\n",
+ " td.druid-right, th.druid-right {\n",
+ " text-align: right;\n",
+ " }\n",
+ "\n",
+ " td.druid-center, th.druid-center {\n",
+ " text-align: center;\n",
+ " }\n",
+ "\n",
+ " .druid .druid-left {\n",
+ " text-align: left;\n",
+ " }\n",
+ "\n",
+ " .druid-alert {\n",
+ " color: red;\n",
+ " }\n",
+ "</style>\n"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "import druidapi\n",
+ "druidapi.styles()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "fb68a838",
+ "metadata": {},
+ "source": [
+ "Next, connect to your cluster by providing the router endpoint. The code
assumes the cluster is on your local machine, using the default port. Go ahead
and change this if your setup is different.\n",
+ "\n",
+ "The API uses the router to forward messages to each of Druid's services
so that you don't have to keep track of the host and port for each service."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "id": "ae601081",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "druid = druidapi.client('http://localhost:8888')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "8b4e774b",
+ "metadata": {},
+ "source": [
+ "## Status Client\n",
+ "\n",
+ "The SDK groups Druid REST API calls into categories, with a client for
each. Start with the status client."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "id": "ff16fc3b",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "status_client = druid.status()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "be992774",
+ "metadata": {},
+ "source": [
+ "Use the Python `help()` function to learn what methods are avaialble."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "id": "03f26417",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Help on StatusClient in module druidapi.status object:\n",
+ "\n",
+ "class StatusClient(builtins.object)\n",
+ " | StatusClient(rest_client)\n",
+ " | \n",
+ " | Client for status APIs. These APIs are available on all nodes.\n",
+ " | If used with the router, they report the status of just the
router.\n",
+ " | \n",
+ " | Methods defined here:\n",
+ " | \n",
+ " | __init__(self, rest_client)\n",
+ " | Initialize self. See help(type(self)) for accurate
signature.\n",
+ " | \n",
+ " | brokers(self)\n",
+ " | \n",
+ " | in_cluster(self)\n",
+ " | Returns `True` if the node is visible wihtin the cluster,
`False` if not.\n",
+ " | (That is, returns the value of the `{\"selfDiscovered\":
true/false}`\n",
+ " | field in the response.\n",
+ " | \n",
+ " | GET `/status/selfDiscovered/status`\n",
+ " | \n",
+ " | See
https://druid.apache.org/docs/latest/operations/api-reference.html#process-information\n",
+ " | \n",
+ " | is_healthy(self) -> bool\n",
+ " | Returns `True` if the node is healthy, an exception
otherwise.\n",
+ " | Useful for automated health checks.\n",
+ " | \n",
+ " | GET `/status/health`\n",
+ " | \n",
+ " | See
https://druid.apache.org/docs/latest/operations/api-reference.html#process-information\n",
+ " | \n",
+ " | properties(self) -> map\n",
+ " | Returns the effective set of Java properties used by the
service, including\n",
+ " | system properties and properties from the
`common_runtime.propeties` and\n",
+ " | `runtime.properties` files.\n",
+ " | \n",
+ " | GET `/status/properties`\n",
+ " | \n",
+ " | See
https://druid.apache.org/docs/latest/operations/api-reference.html#process-information\n",
+ " | \n",
+ " | status(self)\n",
+ " | Returns the Druid version, loaded extensions, memory used,
total memory \n",
+ " | and other useful information about the process.\n",
+ " | \n",
+ " | GET `/status`\n",
+ " | \n",
+ " | See
https://druid.apache.org/docs/latest/operations/api-reference.html#process-information\n",
+ " | \n",
+ " | version(self)\n",
+ " | \n",
+ " | wait_until_ready(self)\n",
+ " | \n",
+ " |
----------------------------------------------------------------------\n",
+ " | Data descriptors defined here:\n",
+ " | \n",
+ " | __dict__\n",
+ " | dictionary for instance variables (if defined)\n",
+ " | \n",
+ " | __weakref__\n",
+ " | list of weak references to the object (if defined)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "help(status_client)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "70f3d578",
+ "metadata": {},
+ "source": [
+ "Druid servers return unexpected results if you make REST calls while
Druid starts up. The following will run until the server is ready. If you
forgot to start your server, or the URL above is wrong, this will hang forever.
Use the Kernel → Interrupt command to break out of the function. (Or,
start your server. If your server refuses to start, then this Jupyter Notebook
may be running on port 8888. See the [README](README.md) for how to start on a
different port.)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "id": "114ed0d1",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "status_client.wait_until_ready()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "e803c9fe",
+ "metadata": {},
+ "source": [
+ "Check the version of your cluster. Some of these notebooks illustrate
newer features available only on specific versions of Druid."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "id": "2faa0d81",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "'26.0.0-SNAPSHOT'"
+ ]
+ },
+ "execution_count": 6,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "status_client.version()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "d78a6c35",
+ "metadata": {},
+ "source": [
+ "You can also check which extensions are loaded in your cluster. Some
notebooks require specific extensions to be available."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "id": "1001f412",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "'[\"druid-hdfs-storage\", \"druid-kafka-indexing-service\",
\"druid-datasketches\", \"druid-multi-stage-query\",
\"druid-lookups-cached-global\", \"druid-catalog\"]'"
+ ]
+ },
+ "execution_count": 7,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "status_client.properties()['druid.extensions.loadList']"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "8825ca39",
+ "metadata": {},
+ "source": [
+ "## SQL Client\n",
+ "\n",
+ "Running SQL queries in a notebook is easy. Here is an example of how to
run a query and display results. The
[pydruid](https://pythonhosted.org/pydruid/) library provides a robust way to
run native queries, to run SQL queries, and to convert the results to various
formats. Here the goal is just to interact with Druid."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "id": "6be0c745",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "sql_client = druid.sql()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "d051bc5e",
+ "metadata": {},
+ "source": [
+ "Start by getting a list of schemas."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "id": "dd8387e0",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div class=\"druid\"><table>\n",
+ "<tr><th>SchemaName</th></tr>\n",
+ "<tr><td>INFORMATION_SCHEMA</td></tr>\n",
+ "<tr><td>druid</td></tr>\n",
+ "<tr><td>ext</td></tr>\n",
+ "<tr><td>lookup</td></tr>\n",
+ "<tr><td>sys</td></tr>\n",
+ "<tr><td>view</td></tr>\n",
+ "</table></div>"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "sql_client.show_schemas()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "b8261ab0",
+ "metadata": {},
+ "source": [
+ "Then, retreive the tables (or datasources) within any schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "id": "64dcb46a",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div class=\"druid\"><table>\n",
+ "<tr><th>TableName</th></tr>\n",
+ "<tr><td>COLUMNS</td></tr>\n",
+ "<tr><td>PARAMETERS</td></tr>\n",
+ "<tr><td>SCHEMATA</td></tr>\n",
+ "<tr><td>TABLES</td></tr>\n",
+ "</table></div>"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "sql_client.show_tables('INFORMATION_SCHEMA')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "ff311595",
+ "metadata": {},
+ "source": [
+ "The above shows the list of datasources by default. You'll get an empty
result if you have no datasources yet."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "id": "616770ce",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div class=\"druid\"><table>\n",
+ "<tr><th>TableName</th></tr>\n",
+ "<tr><td>myWiki</td></tr>\n",
+ "<tr><td>myWiki3</td></tr>\n",
+ "</table></div>"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "sql_client.show_tables()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "7392e484",
+ "metadata": {},
+ "source": [
+ "You can easily run a query and show the results:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "id": "2c649eef",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div class=\"druid\"><table>\n",
+ "<tr><th>TABLE_NAME</th></tr>\n",
+ "<tr><td>COLUMNS</td></tr>\n",
+ "<tr><td>PARAMETERS</td></tr>\n",
+ "<tr><td>SCHEMATA</td></tr>\n",
+ "<tr><td>TABLES</td></tr>\n",
+ "</table></div>"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "sql = '''\n",
+ "SELECT TABLE_NAME\n",
+ "FROM INFORMATION_SCHEMA.TABLES\n",
+ "WHERE TABLE_SCHEMA = 'INFORMATION_SCHEMA'\n",
+ "'''\n",
+ "sql_client.show(sql)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "c6c4e1d4",
+ "metadata": {},
+ "source": [
+ "The query above showed the same results as `show_tables()`. That is not
surprising: `show_tables()` just runs this query for you."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "7b944084",
+ "metadata": {},
+ "source": [
+ "The API also allows passing context parameters and query parameters using
a request object. Druid will work out the query parameter type based on the
Python type. Pass context values as a Python `dict`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 18,
+ "id": "dd559827",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div class=\"druid\"><table>\n",
+ "<tr><th>TABLE_NAME</th></tr>\n",
+ "<tr><td>COLUMNS</td></tr>\n",
+ "<tr><td>PARAMETERS</td></tr>\n",
+ "<tr><td>SCHEMATA</td></tr>\n",
+ "<tr><td>TABLES</td></tr>\n",
+ "</table></div>"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "sql = '''\n",
+ "SELECT TABLE_NAME\n",
+ "FROM INFORMATION_SCHEMA.TABLES\n",
+ "WHERE TABLE_SCHEMA = ?\n",
+ "'''\n",
+ "req = sql_client.sql_request(sql)\n",
+ "req.add_parameter('INFORMATION_SCHEMA')\n",
+ "req.with_context({\"someParameter\": \"someValue\"})\n",
+ "sql_client.show(req)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "937dc6b1",
+ "metadata": {},
+ "source": [
+ "The request has other features for advanced use cases: see the code for
details. The query API actually returns a sql response object. Use this if you
want to get the values directly, work with the schema, etc."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "id": "fd7a1827",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "sql = '''\n",
+ "SELECT TABLE_NAME\n",
+ "FROM INFORMATION_SCHEMA.TABLES\n",
+ "WHERE TABLE_SCHEMA = 'INFORMATION_SCHEMA'\n",
+ "'''\n",
+ "resp = sql_client.sql_query(sql)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 29,
+ "id": "2fe6a749",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "TABLE_NAME VARCHAR string\n"
+ ]
+ }
+ ],
+ "source": [
+ "col1 = resp.schema()[0]\n",
+ "print(col1.name, col1.sql_type, col1.druid_type)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 24,
+ "id": "41d27bb1",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[{'TABLE_NAME': 'COLUMNS'},\n",
+ " {'TABLE_NAME': 'PARAMETERS'},\n",
+ " {'TABLE_NAME': 'SCHEMATA'},\n",
+ " {'TABLE_NAME': 'TABLES'}]"
+ ]
+ },
+ "execution_count": 24,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "resp.rows()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "481af1f2",
+ "metadata": {},
+ "source": [
+ "The `show()` method uses this information for format an HTML table to
present the results."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "9e3be017",
+ "metadata": {},
+ "source": [
+ "## MSQ Ingestion\n",
+ "\n",
+ "The SQL client also performs MSQ-based ingestion using `INSERT` or
`REPLACE` statements. Use the extension check above to ensure that
`druid-multi-stage-query` is loaded in Druid 26. (Later versions may have MSQ
built in.)\n",
+ "\n",
+ "An MSQ query is run using a different API: `task()`. This API returns a
response object that describes the Overlord task which runs the MSQ query. For
tutorials, data is usually small enough you can wait for the ingestion to
complete. Do that with the `run_task()` call which handles the waiting. To
illustrate, here is a query that ingests a subset of columns, and includes a
few data clean-up steps:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 31,
+ "id": "10f1e451",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "sql = '''\n",
+ "REPLACE INTO \"myWiki1\" OVERWRITE ALL\n",
+ "SELECT\n",
+ " TIME_PARSE(\"timestamp\") AS \"__time\",\n",
+ " namespace,\n",
+ " page,\n",
+ " channel,\n",
+ " \"user\",\n",
+ " countryName,\n",
+ " CASE WHEN isRobot = 'true' THEN 1 ELSE 0 END AS isRobot,\n",
+ " \"added\",\n",
+ " \"delta\",\n",
+ " CASE WHEN isNew = 'true' THEN 1 ELSE 0 END AS isNew,\n",
+ " CAST(\"deltaBucket\" AS DOUBLE) AS deltaBucket,\n",
+ " \"deleted\"\n",
+ "FROM TABLE(\n",
+ " EXTERN(\n",
+ "
'{\"type\":\"http\",\"uris\":[\"https://druid.apache.org/data/wikipedia.json.gz\"]}',\n",
+ " '{\"type\":\"json\"}',\n",
+ "
'[{\"name\":\"isRobot\",\"type\":\"string\"},{\"name\":\"channel\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"string\"},{\"name\":\"flags\",\"type\":\"string\"},{\"name\":\"isUnpatrolled\",\"type\":\"string\"},{\"name\":\"page\",\"type\":\"string\"},{\"name\":\"diffUrl\",\"type\":\"string\"},{\"name\":\"added\",\"type\":\"long\"},{\"name\":\"comment\",\"type\":\"string\"},{\"name\":\"commentLength\",\"type\":\"long\"},{\"name\":\"isNew\",\"type\":\"string\"},{\"name\":\"isMinor\",\"type\":\"string\"},{\"name\":\"delta\",\"type\":\"long\"},{\"name\":\"isAnonymous\",\"type\":\"string\"},{\"name\":\"user\",\"type\":\"string\"},{\"name\":\"deltaBucket\",\"type\":\"long\"},{\"name\":\"deleted\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"},{\"name\":\"cityName\",\"type\":\"string\"},{\"name\":\"countryName\",\"type\":\"string\"},{\"name\":\"regionIsoCode\",\"type\":\"string\"},{\"name\":\"metroCode\",\"type\":\"long\"},{\"name\":\"countryIsoCode\",
\"type\":\"string\"},{\"name\":\"regionName\",\"type\":\"string\"}]'\n",
+ " )\n",
+ ")\n",
+ "PARTITIONED BY DAY\n",
+ "CLUSTERED BY namespace, page\n",
+ "'''"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 32,
+ "id": "d752b1d4",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "sql_client.run_task(sql)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "ef4512f8",
+ "metadata": {},
+ "source": [
+ "MSQ reports task completion as soon as ingestion is done. However, it
takes a while for Druid to load the resulting segments. Wait for the table to
become ready."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 34,
+ "id": "37fcedf2",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "sql_client.wait_until_ready('myWiki1')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "11d9c95a",
+ "metadata": {},
+ "source": [
+ "`describe_table()` lists the columns in a table."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 35,
+ "id": "b662697b",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div class=\"druid\"><table>\n",
+ "<tr><th>Position</th><th>Name</th><th>Type</th></tr>\n",
+ "<tr><td>1</td><td>__time</td><td>TIMESTAMP</td></tr>\n",
+ "<tr><td>2</td><td>namespace</td><td>VARCHAR</td></tr>\n",
+ "<tr><td>3</td><td>page</td><td>VARCHAR</td></tr>\n",
+ "<tr><td>4</td><td>channel</td><td>VARCHAR</td></tr>\n",
+ "<tr><td>5</td><td>user</td><td>VARCHAR</td></tr>\n",
+ "<tr><td>6</td><td>countryName</td><td>VARCHAR</td></tr>\n",
+ "<tr><td>7</td><td>isRobot</td><td>BIGINT</td></tr>\n",
+ "<tr><td>8</td><td>added</td><td>BIGINT</td></tr>\n",
+ "<tr><td>9</td><td>delta</td><td>BIGINT</td></tr>\n",
+ "<tr><td>10</td><td>isNew</td><td>BIGINT</td></tr>\n",
+ "<tr><td>11</td><td>deltaBucket</td><td>DOUBLE</td></tr>\n",
+ "<tr><td>12</td><td>deleted</td><td>BIGINT</td></tr>\n",
+ "</table></div>"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "sql_client.describe_table('myWiki1')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "936f57fb",
+ "metadata": {},
+ "source": [
+ "You can sample a few rows of data."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 36,
+ "id": "c4cfa5dc",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div class=\"druid\"><table>\n",
+
"<tr><th>__time</th><th>namespace</th><th>page</th><th>channel</th><th>user</th><th>countryName</th><th>isRobot</th><th>added</th><th>delta</th><th>isNew</th><th>deltaBucket</th><th>deleted</th></tr>\n",
+ "<tr><td>2016-06-27T00:00:11.080Z</td><td>Main</td><td>Salo
Toraut</td><td>#sv.wikipedia</td><td>Lsjbot</td><td></td><td>1</td><td>31</td><td>31</td><td>1</td><td>0.0</td><td>0</td></tr>\n",
+
"<tr><td>2016-06-27T00:00:17.457Z</td><td>利用者</td><td>利用者:ワーナー成増/放送ウーマン賞</td><td>#ja.wikipedia</td><td>ワーナー成増</td><td></td><td>0</td><td>125</td><td>125</td><td>0</td><td>100.0</td><td>0</td></tr>\n",
+ "<tr><td>2016-06-27T00:00:34.959Z</td><td>Main</td><td>Bailando
2015</td><td>#en.wikipedia</td><td>181.230.118.178</td><td>Argentina</td><td>0</td><td>2</td><td>2</td><td>0</td><td>0.0</td><td>0</td></tr>\n",
+ "<tr><td>2016-06-27T00:00:36.027Z</td><td>Main</td><td>Richie
Rich's Christmas
Wish</td><td>#en.wikipedia</td><td>JasonAQuest</td><td></td><td>0</td><td>0</td><td>-2</td><td>0</td><td>-100.0</td><td>2</td></tr>\n",
+ "<tr><td>2016-06-27T00:00:46.874Z</td><td>Main</td><td>El Olivo,
Ascensión</td><td>#sh.wikipedia</td><td>Kolega2357</td><td></td><td>1</td><td>0</td><td>-1</td><td>0</td><td>-100.0</td><td>1</td></tr>\n",
+ "<tr><td>2016-06-27T00:00:56.913Z</td><td>Main</td><td>Blowback
(intelligence)</td><td>#en.wikipedia</td><td>Brokenshardz</td><td></td><td>0</td><td>76</td><td>76</td><td>0</td><td>0.0</td><td>0</td></tr>\n",
+
"<tr><td>2016-06-27T00:00:58.599Z</td><td>Kategoria</td><td>Kategoria:Dyskusje
nad usunięciem artykułu zakończone bez konsensusu − lipiec
2016</td><td>#pl.wikipedia</td><td>Beau.bot</td><td></td><td>1</td><td>270</td><td>270</td><td>1</td><td>200.0</td><td>0</td></tr>\n",
+ "<tr><td>2016-06-27T00:01:01.364Z</td><td>Main</td><td>El Paraíso,
Bachíniva</td><td>#sh.wikipedia</td><td>Kolega2357</td><td></td><td>1</td><td>0</td><td>-1</td><td>0</td><td>-100.0</td><td>1</td></tr>\n",
+ "<tr><td>2016-06-27T00:01:03.685Z</td><td>Main</td><td>El Terco,
Bachíniva</td><td>#sh.wikipedia</td><td>Kolega2357</td><td></td><td>1</td><td>0</td><td>-1</td><td>0</td><td>-100.0</td><td>1</td></tr>\n",
+
"<tr><td>2016-06-27T00:01:07.347Z</td><td>Main</td><td>Neqerssuaq</td><td>#ceb.wikipedia</td><td>Lsjbot</td><td></td><td>1</td><td>4150</td><td>4150</td><td>1</td><td>4100.0</td><td>0</td></tr>\n",
+ "</table></div>"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "sql_client.show('SELECT * FROM myWiki1 LIMIT 10')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "c1152f41",
+ "metadata": {},
+ "source": [
+ "## Datasource Client\n",
+ "\n",
+ "The Datasource client lets you perform operations on datasource objects.
The SQL layer allows you to get metadata and do queries. The datasource client
works with the underlying segments. Explaining the full functionality is the
topic of another notebook. For now, you can use the datasource client to clean
up the datasource created above. The `True` argument asks for \"if exists\"
semantics so you don't get an error if the datasource was alredy deleted."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 37,
+ "id": "fba659ce",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "ds_client = druid.datasources()\n",
+ "ds_client.drop('myWiki', True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "c96fdcc6",
+ "metadata": {},
+ "source": [
+ "## Tasks Client\n",
+ "\n",
+ "Use the tasks client to work with Overlord tasks. The `run_task()` call
above actually uses the task client internally to poll Overlord."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 40,
+ "id": "b4f5ea17",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[{'id': 'query-24066a63-7e20-41bb-b212-80f193e6f2c8-worker0_0',\n",
+ " 'groupId': 'query-24066a63-7e20-41bb-b212-80f193e6f2c8',\n",
+ " 'type': 'query_worker',\n",
+ " 'createdTime': '2023-02-09T22:49:01.761Z',\n",
+ " 'queueInsertionTime': '1970-01-01T00:00:00.000Z',\n",
+ " 'statusCode': 'SUCCESS',\n",
+ " 'status': 'SUCCESS',\n",
+ " 'runnerStatusCode': 'NONE',\n",
+ " 'duration': 57895,\n",
+ " 'location': {'host': 'localhost', 'port': 8101, 'tlsPort': -1},\n",
+ " 'dataSource': 'myWiki1',\n",
+ " 'errorMsg': None},\n",
+ " {'id': 'query-24066a63-7e20-41bb-b212-80f193e6f2c8',\n",
+ " 'groupId': 'query-24066a63-7e20-41bb-b212-80f193e6f2c8',\n",
+ " 'type': 'query_controller',\n",
+ " 'createdTime': '2023-02-09T22:48:30.512Z',\n",
+ " 'queueInsertionTime': '1970-01-01T00:00:00.000Z',\n",
+ " 'statusCode': 'SUCCESS',\n",
+ " 'status': 'SUCCESS',\n",
+ " 'runnerStatusCode': 'NONE',\n",
+ " 'duration': 92476,\n",
+ " 'location': {'host': 'localhost', 'port': 8100, 'tlsPort': -1},\n",
+ " 'dataSource': 'myWiki1',\n",
+ " 'errorMsg': None}]"
+ ]
+ },
+ "execution_count": 40,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "task_client = druid.tasks()\n",
+ "task_client.tasks()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "1deaf95f",
+ "metadata": {},
+ "source": [
+ "## REST Client\n",
+ "\n",
+ "The Druid Python API starts with a REST client that itself is built on
the `requests` package. The REST client implements the common patterns seen in
the Druid REST API. You can create a client directly:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 45,
+ "id": "b1e55635",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from druidapi.rest import DruidRestClient\n",
+ "rest_client = DruidRestClient(\"http://localhost:8888\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "dcb8055f",
+ "metadata": {},
+ "source": [
+ "Or, if you have already created the Druid client, you can reuse the
existing REST client. This is how the various other clients work internally."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "370ba76a",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "rest_client = druid.rest()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "2654e72c",
+ "metadata": {},
+ "source": [
+ "Use the REST client if you need to make calls that are not yet wrapped by
the Python API, or if you want to do something special. To illustrate the
client, you can make some of the same calls as in the [Druid REST API
notebook](api_tutorial.ipynb).\n",
+ "\n",
+ "The REST API maintains the Druid host: you just provide the specifc URL
tail. There are methods to get or post JSON results. For example, to get status
information:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 53,
+ "id": "9e42dfbc",
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "{'version': '26.0.0-SNAPSHOT',\n",
+ " 'modules': [{'name': 'org.apache.druid.common.aws.AWSModule',\n",
+ " 'artifact': 'druid-aws-common',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name': 'org.apache.druid.common.gcp.GcpModule',\n",
+ " 'artifact': 'druid-gcp-common',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name': 'org.apache.druid.storage.hdfs.HdfsStorageDruidModule',\n",
+ " 'artifact': 'druid-hdfs-storage',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name': 'org.apache.druid.indexing.kafka.KafkaIndexTaskModule',\n",
+ " 'artifact': 'druid-kafka-indexing-service',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name':
'org.apache.druid.query.aggregation.datasketches.theta.SketchModule',\n",
+ " 'artifact': 'druid-datasketches',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name':
'org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldApiSketchModule',\n",
+ " 'artifact': 'druid-datasketches',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name':
'org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule',\n",
+ " 'artifact': 'druid-datasketches',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name':
'org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchModule',\n",
+ " 'artifact': 'druid-datasketches',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name':
'org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule',\n",
+ " 'artifact': 'druid-datasketches',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name':
'org.apache.druid.query.aggregation.datasketches.kll.KllSketchModule',\n",
+ " 'artifact': 'druid-datasketches',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name':
'org.apache.druid.msq.guice.MSQExternalDataSourceModule',\n",
+ " 'artifact': 'druid-multi-stage-query',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name': 'org.apache.druid.msq.guice.MSQIndexingModule',\n",
+ " 'artifact': 'druid-multi-stage-query',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name': 'org.apache.druid.msq.guice.MSQDurableStorageModule',\n",
+ " 'artifact': 'druid-multi-stage-query',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name': 'org.apache.druid.msq.guice.MSQServiceClientModule',\n",
+ " 'artifact': 'druid-multi-stage-query',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name': 'org.apache.druid.msq.guice.MSQSqlModule',\n",
+ " 'artifact': 'druid-multi-stage-query',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name': 'org.apache.druid.msq.guice.SqlTaskModule',\n",
+ " 'artifact': 'druid-multi-stage-query',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name':
'org.apache.druid.server.lookup.namespace.NamespaceExtractionModule',\n",
+ " 'artifact': 'druid-lookups-cached-global',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name':
'org.apache.druid.catalog.guice.CatalogCoordinatorModule',\n",
+ " 'artifact': 'druid-catalog',\n",
+ " 'version': '26.0.0-SNAPSHOT'},\n",
+ " {'name': 'org.apache.druid.catalog.guice.CatalogBrokerModule',\n",
+ " 'artifact': 'druid-catalog',\n",
+ " 'version': '26.0.0-SNAPSHOT'}],\n",
+ " 'memory': {'maxMemory': 134217728,\n",
+ " 'totalMemory': 134217728,\n",
+ " 'freeMemory': 80642696,\n",
+ " 'usedMemory': 53575032,\n",
+ " 'directMemory': 134217728}}"
+ ]
+ },
+ "execution_count": 53,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "rest_client.get_json('/status')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "837e08b0",
+ "metadata": {},
+ "source": [
+ "A quick comparison of the three approaches (Requests, REST client, Python
client):\n",
+ "\n",
+ "Status:\n",
+ "* Requests: `session.get(druid_host + '/status').json()`\n",
+ "* REST client: `rest_client.get_json('/status')`\n",
+ "* Status client: `status_client.status()`\n",
+ "\n",
+ "Health:\n",
+ "* Requests: `session.get(druid_host + '/status/health').json()`\n",
+ "* REST client: `rest_client.get_json('/status/health')`\n",
+ "* Status client: `status_client.is_healthy()`\n",
+ "\n",
+ "Ingest data:\n",
+ "* Requests: See the [REST tutorial](api_tutorial.ipynb)\n",
+ "* REST client: as the REST tutorial, but use
`rest_client.post_json('/druid/v2/sql/task', sql_request)` and\n",
+ "
`rest_client.get_json(f\"/druid/indexer/v1/task/{ingestion_taskId}/status\")`\n",
+ "* SQL client: `sql_client.run_task(sql)`, also a form for a full SQL
request.\n",
+ "\n",
+ "List datasources:\n",
+ "* Requests: `session.get(druid_host +
'/druid/coordinator/v1/datasources').json()`\n",
+ "* REST client:
`rest_client.get_json('/druid/coordinator/v1/datasources')`\n",
+ "* Datasources client: `ds_client.names()`\n",
+ "\n",
+ "Query data:\n",
+ "* Requests: `session.get(druid_host + '/druid/v2/sql',
json=sql_request).json()`\n",
+ "* REST client: `rest_client.get_json('/druid/v2/sql', sql_request)`\n",
Review Comment:
Right you are. Fixed.
##########
examples/quickstart/jupyter-notebooks/druidapi/sql.py:
##########
@@ -0,0 +1,690 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time, requests
+from . import consts, display
+from .consts import ROUTER_BASE
+from .util import is_blank, dict_get
+from .error import DruidError, ClientError
+
+REQ_ROUTER_QUERY = ROUTER_BASE
+REQ_ROUTER_SQL = ROUTER_BASE + '/sql'
+REQ_ROUTER_SQL_TASK = REQ_ROUTER_SQL + '/task'
+
+class SqlRequest:
+
+ def __init__(self, query_client, sql):
+ self.query_client = query_client
+ self.sql = sql
+ self.context = None
+ self.params = None
+ self.header = False
+ self.format = consts.SQL_OBJECT
+ self.headers = None
+ self.types = None
+ self.sqlTypes = None
+
+ def with_format(self, result_format):
+ self.format = result_format
+ return self
+
+ def with_headers(self, sqlTypes=False, druidTypes=False):
+ self.headers = True
+ self.types = druidTypes
+ self.sqlTypes = sqlTypes
+ return self
+
+ def with_context(self, context):
+ if self.context is None:
+ self.context = context
+ else:
+ self.context.update(context)
+ return self
+
+ def with_parameters(self, params):
+ '''
+ Set the array of parameters. Parameters must each be a map of
'type'/'value' pairs:
+ {'type': the_type, 'value': the_value}. The type must be a valid SQL
type
+ (in upper case). See the consts module for a list.
+ '''
+ if self.params is None:
+ self.params = params
+ else:
+ self.params.update(params)
+ return self
+
+ def add_parameter(self, value):
+ '''
+ Add one parameter value. Infers the type of the parameter from the
Python type.
+ '''
+ if value is None:
+ raise ClientError("Druid does not support null parameter values")
+ data_type = None
+ value_type = type(value)
+ if value_type is str:
+ data_type = consts.SQL_VARCHAR_TYPE
+ elif value_type is int:
+ data_type = consts.SQL_BIGINT_TYPE
+ elif value_type is float:
+ data_type = consts.SQL_DOUBLE_TYPE
+ elif value_type is list:
+ data_type = consts.SQL_ARRAY_TYPE
+ else:
+ raise ClientError("Unsupported value type")
+ if self.params is None:
+ self.params = []
+ self.params.append({'type': data_type, 'value': value})
+
+ def response_header(self):
+ self.header = True
+ return self
+
+ def request_headers(self, headers):
+ self.headers = headers
+ return self
+
+ def to_request(self):
+ query_obj = {"query": self.sql}
+ if self.context is not None and len(self.context) > 0:
+ query_obj['context'] = self.context
+ if self.params is not None and len(self.params) > 0:
+ query_obj['parameters'] = self.params
+ if self.header:
+ query_obj['header'] = True
+ if self.result_format is not None:
+ query_obj['resultFormat'] = self.format
+ if self.sqlTypes:
+ query_obj['sqlTypesHeader'] = self.sqlTypes
+ if self.types:
+ query_obj['typesHeader'] = self.types
+ return query_obj
+
+ def result_format(self):
+ return self.format.lower()
+
+ def run(self):
+ return self.query_client.sql_query(self)
+
+def parse_rows(fmt, context, results):
+ if fmt == consts.SQL_ARRAY_WITH_TRAILER:
+ rows = results['results']
+ elif fmt == consts.SQL_ARRAY:
+ rows = results
+ else:
+ return results
+ if not context.get('headers', False):
+ return rows
+ header_size = 1
+ if context.get('sqlTypesHeader', False):
+ header_size += 1
+ if context.get('typesHeader', False):
+ header_size += 1
+ return rows[header_size:]
+
+def label_non_null_cols(results):
+ if results is None or len(results) == 0:
+ return []
+ is_null = {}
+ for key in results[0].keys():
+ is_null[key] = True
+ for row in results:
+ for key, value in row.items():
+ if type(value) == str:
+ if value != '':
+ is_null[key] = False
+ elif type(value) == float:
+ if value != 0.0:
+ is_null[key] = False
+ elif value is not None:
+ is_null[key] = False
+ return is_null
+
+def filter_null_cols(results):
+ '''
+ Filter columns from a Druid result set by removing all null-like
+ columns. A column is considered null if all values for that column
+ are null. A value is null if it is either a JSON null, an empty
+ string, or a numeric 0. All rows are preserved, as is the order
+ of the remaining columns.
+ '''
+ if results is None or len(results) == 0:
+ return results
+ is_null = label_non_null_cols(results)
+ revised = []
+ for row in results:
+ new_row = {}
+ for key, value in row.items():
+ if is_null[key]:
+ continue
+ new_row[key] = value
+ revised.append(new_row)
+ return revised
+
+def parse_object_schema(results):
+ schema = []
+ if len(results) == 0:
+ return schema
+ row = results[0]
+ for k, v in row.items():
+ druid_type = None
+ sql_type = None
+ if type(v) is str:
+ druid_type = consts.DRUID_STRING_TYPE
+ sql_type = consts.SQL_VARCHAR_TYPE
+ elif type(v) is int or type(v) is float:
+ druid_type = consts.DRUID_LONG_TYPE
+ sql_type = consts.SQL_BIGINT_TYPE
+ schema.append(ColumnSchema(k, sql_type, druid_type))
+ return schema
+
+def parse_array_schema(context, results):
+ schema = []
+ if len(results) == 0:
+ return schema
+ has_headers = context.get(consts.HEADERS_KEY, False)
+ if not has_headers:
+ return schema
+ has_sql_types = context.get(consts.SQL_TYPES_HEADERS_KEY, False)
+ has_druid_types = context.get(consts.DRUID_TYPE_HEADERS_KEY, False)
+ size = len(results[0])
+ for i in range(size):
+ druid_type = None
+ if has_druid_types:
+ druid_type = results[1][i]
+ sql_type = None
+ if has_sql_types:
+ sql_type = results[2][i]
+ schema.append(ColumnSchema(results[0][i], sql_type, druid_type))
+ return schema
+
+def parse_schema(fmt, context, results):
+ if fmt == consts.SQL_OBJECT:
+ return parse_object_schema(results)
+ elif fmt == consts.SQL_ARRAY or fmt == consts.SQL_ARRAY_WITH_TRAILER:
+ return parse_array_schema(context, results)
+ else:
+ return []
+
+def is_response_ok(http_response):
+ code = http_response.status_code
+ return code == requests.codes.ok or code == requests.codes.accepted
+
+class ColumnSchema:
+
+ def __init__(self, name, sql_type, druid_type):
+ self.name = name
+ self.sql_type = sql_type
+ self.druid_type = druid_type
+
+ def __str__(self):
+ return "{{name={}, SQL type={}, Druid type={}}}".format(self.name,
self.sql_type, self.druid_type)
+
+class SqlQueryResult:
+ """
+ Defines the core protocol for Druid SQL queries.
+ """
+
+ def __init__(self, request, response):
+ self.http_response = response
+ self._json = None
+ self._rows = None
+ self._schema = None
+ self.request = request
+ self._error = None
+ self._id = None
+ if not is_response_ok(response):
+ try:
+ self._error = response.json()
+ except Exception:
+ self._error = response.text
+ if self._error is None or len(self._error) == 0:
+ self._error = "Failed with HTTP status
{}".format(response.status_code)
+ try:
+ self._id = self.http_response.headers['X-Druid-SQL-Query-Id']
+ except KeyError:
+ self._error = "Query returned no query ID"
+
+ def result_format(self):
+ return self.request.result_format()
+
+ def ok(self):
+ """
+ Reports if the query succeeded.
+
+ The query rows and schema are available only if ok() returns True.
+ """
+ return is_response_ok(self.http_response)
+
+ def error_msg(self):
+ err = self.error()
+ if err is None:
+ return "unknown"
+ if type(err) is str:
+ return err
+ msg = err.get("error")
+ text = err.get("errorMessage")
+ if msg is None and text is None:
+ return "unknown"
+ if msg is None:
+ return text
+ if text is None:
+ return msg
+ return msg + ": " + text
+
+ def id(self):
+ """
+ Returns the unique identifier for the query.
+ """
+ return self._id
+
+ def non_null(self):
+ if not self.ok():
+ return None
+ if self.result_format() != consts.SQL_OBJECT:
+ return None
+ return filter_null_cols(self.rows())
+
+ def as_array(self):
+ if self.result_format() == consts.SQL_OBJECT:
+ rows = []
+ for obj in self.rows():
+ rows.append([v for v in obj.values()])
+ return rows
+ else:
+ return self.rows()
+
+ def error(self):
+ """
+ If the query fails, returns the error, if any provided by Druid.
+ """
+ if self.ok():
+ return None
+ if self._error is not None:
+ return self._error
+ if self.http_response is None:
+ return { "error": "unknown"}
+ if is_response_ok(self.http_response):
+ return None
+ return {"error": "HTTP {}".format(self.http_response.status_code)}
+
+ def json(self):
+ if not self.ok():
+ return None
+ if self._json is None:
+ self._json = self.http_response.json()
+ return self._json
+
+ def rows(self):
+ """
+ Returns the rows of data for the query.
+
+ Druid supports many data formats. The method makes its best
+ attempt to map the format into an array of rows of some sort.
+ """
+ if self._rows is None:
+ json = self.json()
+ if json is None:
+ return self.http_response.text
+ self._rows = parse_rows(self.result_format(),
self.request.context, json)
+ return self._rows
+
+ def schema(self):
+ """
+ Returns the data schema as a list of ColumnSchema objects.
+
+ Druid supports many data formats, not all of them provide
+ schema information. This method makes its best attempt to
+ extract the schema from the query results.
+ """
+ if self._schema is None:
+ self._schema = parse_schema(self.result_format(),
self.request.context, self.json())
+ return self._schema
+
+ def show(self, non_null=False):
+ data = None
+ if non_null:
+ data = self.non_null()
+ if data is None:
+ data = self.as_array()
+ if data is None or len(data) == 0:
+ display.display.show_message("Query returned no results")
+ return
+ disp = display.display.table()
+ disp.headers([c.name for c in self.schema()])
+ disp.show(data)
+
+ def show_schema(self):
+ disp = display.display.table()
+ disp.headers(['Name', 'SQL Type', 'Druid Type'])
+ data = []
+ for c in self.schema():
+ data.append([c.name, c.sql_type, c.druid_type])
+ disp.show(data)
+
+class QueryTaskResult:
+
+ def __init__(self, request, response):
+ self._request = request
+ self.http_response = response
+ self._status = None
+ self._results = None
+ self._details = None
+ self._schema = None
+ self._rows = None
+ self._reports = None
+ self._schema = None
+ self._results = None
+ self._error = None
+ self._id = None
+ if not is_response_ok(response):
+ self._state = consts.FAILED_STATE
+ try:
+ self._error = response.json()
+ except Exception:
+ self._error = response.text
+ if self._error is None or len(self._error) == 0:
+ self._error = "Failed with HTTP status
{}".format(response.status_code)
+ return
+
+ # Typical response:
+ # {'taskId':
'6f7b514a446d4edc9d26a24d4bd03ade_fd8e242b-7d93-431d-b65b-2a512116924c_bjdlojgj',
+ # 'state': 'RUNNING'}
+ self.response_obj = response.json()
+ self._id = self.response_obj['taskId']
+ self._state = self.response_obj['state']
+
+ def ok(self):
+ """
+ Reports if the query succeeded.
+
+ The query rows and schema are available only if ok() returns True.
+ """
+ return self._error is None
+
+ def id(self):
+ return self._id
+
+ def _tasks(self):
+ return self._request.query_client.druid_client.tasks()
+
+ def status(self):
+ """
+ Polls Druid for an update on the query run status.
+ """
+ self.check_valid()
+ # Example:
+ # {'task': 'talaria-sql-w000-b373b68d-2675-4035-b4d2-7a9228edead6',
+ # 'status': {
+ # 'id': 'talaria-sql-w000-b373b68d-2675-4035-b4d2-7a9228edead6',
+ # 'groupId': 'talaria-sql-w000-b373b68d-2675-4035-b4d2-7a9228edead6',
+ # 'type': 'talaria0', 'createdTime': '2022-04-28T23:19:50.331Z',
+ # 'queueInsertionTime': '1970-01-01T00:00:00.000Z',
+ # 'statusCode': 'RUNNING', 'status': 'RUNNING', 'runnerStatusCode':
'PENDING',
+ # 'duration': -1, 'location': {'host': None, 'port': -1, 'tlsPort':
-1},
+ # 'dataSource': 'w000', 'errorMsg': None}}
+ self._status = self._tasks().task_status(self._id)
+ self._state = self._status['status']['status']
+ if self._state == consts.FAILED_STATE:
+ self._error = self._status['status']['errorMsg']
+ return self._status
+
+ def done(self):
+ """
+ Reports if the query is done: succeeded or failed.
+ """
+ return self._state == consts.FAILED_STATE or self._state ==
consts.SUCCESS_STATE
+
+ def succeeded(self):
+ """
+ Reports if the query succeeded.
+ """
+ return self._state == consts.SUCCESS_STATE
+
+ def state(self):
+ """
+ Reports the engine-specific query state.
+
+ Updated after each call to status().
+ """
+ return self._state
+
+ def error(self):
+ return self._error
+
+ def error_msg(self):
+ err = self.error()
+ if err is None:
+ return "unknown"
+ if type(err) is str:
+ return err
+ msg = dict_get(err, "error")
+ text = dict_get(err, "errorMessage")
+ if msg is None and text is None:
+ return "unknown"
+ if text is not None:
+ text = text.replace('\\n', '\n')
+ if msg is None:
+ return text
+ if text is None:
+ return msg
+ return msg + ": " + text
+
+ def join(self):
+ if not self.done():
+ self.status()
+ while not self.done():
+ time.sleep(0.5)
+ self.status()
+ return self.succeeded()
+
+ def check_valid(self):
+ if self._id is None:
+ raise ClientError("Operation is invalid on a failed query")
+
+ def wait_done(self):
+ if not self.join():
+ raise DruidError("Query failed: " + self.error_msg())
+
+ def wait(self):
+ self.wait_done()
+ return self.rows()
+
+ def reports(self) -> dict:
+ self.check_valid()
+ if self._reports is None:
+ self.join()
+ self._reports = self._tasks().task_reports(self._id)
+ return self._reports
+
+ def results(self):
+ if self._results is None:
+ rpts = self.reports()
+ self._results = rpts['multiStageQuery']['payload']['results']
+ return self._results
+
+ def schema(self):
+ if self._schema is None:
+ results = self.results()
+ sig = results['signature']
+ sqlTypes = results['sqlTypeNames']
+ size = len(sig)
+ self._schema = []
+ for i in range(size):
+ self._schema.append(ColumnSchema(sig[i]['name'], sqlTypes[i],
sig[i]['type']))
+ return self._schema
+
+ def rows(self):
+ if self._rows is None:
+ results = self.results()
+ self._rows = results['results']
+ return self._rows
+
+ def show(self, non_null=False):
+ data = self.rows()
+ if non_null:
+ data = filter_null_cols(data)
+ disp = display.display.table()
+ disp.headers([c.name for c in self.schema()])
+ disp.show(data)
+
+class QueryClient:
+
+ def __init__(self, druid, rest_client=None):
+ self.druid_client = druid
+ self._rest_client = druid.rest_client if rest_client is None else
rest_client
+
+ def rest_client(self):
+ return self._rest_client
+
+ def _prepare_query(self, request):
+ if request is None:
+ raise ClientError("No query provided.")
+ if type(request) == str:
+ request = self.sql_request(request)
+ if is_blank(request.sql):
+ raise ClientError("No query provided.")
+ if self.rest_client().trace:
+ print(request.sql)
+ query_obj = request.to_request()
+ return (request, query_obj)
+
+ def sql_query(self, request) -> SqlQueryResult:
+ '''
+ Submit a SQL query with control over the context, parameters and other
+ options. Returns a response with either a detailed error message, or
+ the rows and query ID.
+ '''
+ request, query_obj = self._prepare_query(request)
+ r = self.rest_client().post_only_json(REQ_ROUTER_SQL, query_obj,
headers=request.headers)
+ return SqlQueryResult(request, r)
+
+ def sql(self, sql, *args):
+ if len(args) > 0:
+ sql = sql.result_format(*args)
+ resp = self.sql_query(sql)
+ if resp.ok():
+ return resp.rows()
+ raise ClientError(resp.error_msg())
+
+ def explain_sql(self, query):
+ """
+ Run an EXPLAIN PLAN FOR query for the given query.
+
+ Returns
+ -------
+ An object with the plan JSON parsed into Python objects:
+ plan: the query plan
+ columns: column schema
+ tables: dictionary of name/type pairs
+ """
+ if is_blank(query):
+ raise ClientError("No query provided.")
+ results = self.sql('EXPLAIN PLAN FOR ' + query)
+ return results[0]
+
+ def sql_request(self, sql):
+ return SqlRequest(self, sql)
+
+ def show(self, query):
+ result = self.sql_query(query)
+ if result.ok():
+ result.show()
+ else:
+ display.display.show_error(result.error_msg())
+
+ def task(self, request):
+ request, query_obj = self._prepare_query(request)
+ r = self.rest_client().post_only_json(REQ_ROUTER_SQL_TASK, query_obj,
headers=request.headers)
+ return QueryTaskResult(request, r)
+
+ def run_task(self, request):
+ resp = self.task(request)
+ if not resp.ok():
+ raise ClientError(resp.error_msg())
+ resp.wait_done()
+
+ def _tables_query(self, schema):
+ return self.sql_query('''
+ SELECT TABLE_NAME AS TableName
+ FROM INFORMATION_SCHEMA.TABLES
+ WHERE TABLE_SCHEMA = '{}'
+ ORDER BY TABLE_NAME
+ '''.format(schema))
+
+ def tables(self, schema=consts.DRUID_SCHEMA):
+ return self._tables_query(schema).rows()
+
+ def show_tables(self, schema=consts.DRUID_SCHEMA):
+ self._tables_query(schema).show()
+
+ def _schemas_query(self):
+ return self.sql_query('''
+ SELECT SCHEMA_NAME AS SchemaName
+ FROM INFORMATION_SCHEMA.SCHEMATA
+ ORDER BY SCHEMA_NAME
+ ''')
+
+ def show_schemas(self):
+ self._schemas_query().show()
+
+ def describe_table(self, part1, part2=None):
Review Comment:
Rewritten to make clearer.
##########
examples/quickstart/jupyter-notebooks/druidapi/sql.py:
##########
@@ -0,0 +1,690 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time, requests
+from . import consts, display
+from .consts import ROUTER_BASE
+from .util import is_blank, dict_get
+from .error import DruidError, ClientError
+
+REQ_ROUTER_QUERY = ROUTER_BASE
+REQ_ROUTER_SQL = ROUTER_BASE + '/sql'
+REQ_ROUTER_SQL_TASK = REQ_ROUTER_SQL + '/task'
+
+class SqlRequest:
+
+ def __init__(self, query_client, sql):
+ self.query_client = query_client
+ self.sql = sql
+ self.context = None
+ self.params = None
+ self.header = False
+ self.format = consts.SQL_OBJECT
+ self.headers = None
+ self.types = None
+ self.sqlTypes = None
+
+ def with_format(self, result_format):
+ self.format = result_format
+ return self
+
+ def with_headers(self, sqlTypes=False, druidTypes=False):
+ self.headers = True
+ self.types = druidTypes
+ self.sqlTypes = sqlTypes
+ return self
+
+ def with_context(self, context):
+ if self.context is None:
+ self.context = context
+ else:
+ self.context.update(context)
+ return self
+
+ def with_parameters(self, params):
+ '''
+ Set the array of parameters. Parameters must each be a map of
'type'/'value' pairs:
+ {'type': the_type, 'value': the_value}. The type must be a valid SQL
type
+ (in upper case). See the consts module for a list.
+ '''
+ if self.params is None:
+ self.params = params
+ else:
+ self.params.update(params)
+ return self
+
+ def add_parameter(self, value):
+ '''
+ Add one parameter value. Infers the type of the parameter from the
Python type.
+ '''
+ if value is None:
+ raise ClientError("Druid does not support null parameter values")
+ data_type = None
+ value_type = type(value)
+ if value_type is str:
+ data_type = consts.SQL_VARCHAR_TYPE
+ elif value_type is int:
+ data_type = consts.SQL_BIGINT_TYPE
+ elif value_type is float:
+ data_type = consts.SQL_DOUBLE_TYPE
+ elif value_type is list:
+ data_type = consts.SQL_ARRAY_TYPE
+ else:
+ raise ClientError("Unsupported value type")
+ if self.params is None:
+ self.params = []
+ self.params.append({'type': data_type, 'value': value})
+
+ def response_header(self):
+ self.header = True
+ return self
+
+ def request_headers(self, headers):
+ self.headers = headers
+ return self
+
+ def to_request(self):
+ query_obj = {"query": self.sql}
+ if self.context is not None and len(self.context) > 0:
+ query_obj['context'] = self.context
+ if self.params is not None and len(self.params) > 0:
+ query_obj['parameters'] = self.params
+ if self.header:
+ query_obj['header'] = True
+ if self.result_format is not None:
+ query_obj['resultFormat'] = self.format
+ if self.sqlTypes:
+ query_obj['sqlTypesHeader'] = self.sqlTypes
+ if self.types:
+ query_obj['typesHeader'] = self.types
+ return query_obj
+
+ def result_format(self):
+ return self.format.lower()
+
+ def run(self):
+ return self.query_client.sql_query(self)
+
+def parse_rows(fmt, context, results):
+ if fmt == consts.SQL_ARRAY_WITH_TRAILER:
+ rows = results['results']
+ elif fmt == consts.SQL_ARRAY:
+ rows = results
+ else:
+ return results
+ if not context.get('headers', False):
+ return rows
+ header_size = 1
+ if context.get('sqlTypesHeader', False):
+ header_size += 1
+ if context.get('typesHeader', False):
+ header_size += 1
+ return rows[header_size:]
+
+def label_non_null_cols(results):
+ if results is None or len(results) == 0:
Review Comment:
Right. The challenge is that I'm often checking only for the `None `case,
not for the `[]`, `0` or `False` cases. Would prefer not to write `not foo and
foo != False`, say. Modified those expressions where it is safe to use the more
general form; left those where it would match the wrong values.
##########
examples/quickstart/jupyter-notebooks/druidapi/sql.py:
##########
@@ -0,0 +1,690 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time, requests
+from . import consts, display
+from .consts import ROUTER_BASE
+from .util import is_blank, dict_get
+from .error import DruidError, ClientError
+
+REQ_ROUTER_QUERY = ROUTER_BASE
+REQ_ROUTER_SQL = ROUTER_BASE + '/sql'
+REQ_ROUTER_SQL_TASK = REQ_ROUTER_SQL + '/task'
+
+class SqlRequest:
+
+ def __init__(self, query_client, sql):
+ self.query_client = query_client
+ self.sql = sql
+ self.context = None
+ self.params = None
+ self.header = False
+ self.format = consts.SQL_OBJECT
+ self.headers = None
+ self.types = None
+ self.sqlTypes = None
+
+ def with_format(self, result_format):
+ self.format = result_format
+ return self
+
+ def with_headers(self, sqlTypes=False, druidTypes=False):
+ self.headers = True
+ self.types = druidTypes
+ self.sqlTypes = sqlTypes
+ return self
+
+ def with_context(self, context):
+ if self.context is None:
+ self.context = context
+ else:
+ self.context.update(context)
+ return self
+
+ def with_parameters(self, params):
+ '''
+ Set the array of parameters. Parameters must each be a map of
'type'/'value' pairs:
+ {'type': the_type, 'value': the_value}. The type must be a valid SQL
type
+ (in upper case). See the consts module for a list.
+ '''
+ if self.params is None:
+ self.params = params
+ else:
+ self.params.update(params)
+ return self
+
+ def add_parameter(self, value):
+ '''
+ Add one parameter value. Infers the type of the parameter from the
Python type.
+ '''
+ if value is None:
+ raise ClientError("Druid does not support null parameter values")
+ data_type = None
+ value_type = type(value)
+ if value_type is str:
+ data_type = consts.SQL_VARCHAR_TYPE
+ elif value_type is int:
+ data_type = consts.SQL_BIGINT_TYPE
+ elif value_type is float:
+ data_type = consts.SQL_DOUBLE_TYPE
+ elif value_type is list:
+ data_type = consts.SQL_ARRAY_TYPE
+ else:
+ raise ClientError("Unsupported value type")
+ if self.params is None:
+ self.params = []
+ self.params.append({'type': data_type, 'value': value})
+
+ def response_header(self):
+ self.header = True
+ return self
+
+ def request_headers(self, headers):
+ self.headers = headers
+ return self
+
+ def to_request(self):
+ query_obj = {"query": self.sql}
+ if self.context is not None and len(self.context) > 0:
+ query_obj['context'] = self.context
+ if self.params is not None and len(self.params) > 0:
+ query_obj['parameters'] = self.params
+ if self.header:
+ query_obj['header'] = True
+ if self.result_format is not None:
+ query_obj['resultFormat'] = self.format
+ if self.sqlTypes:
+ query_obj['sqlTypesHeader'] = self.sqlTypes
+ if self.types:
+ query_obj['typesHeader'] = self.types
+ return query_obj
+
+ def result_format(self):
+ return self.format.lower()
+
+ def run(self):
+ return self.query_client.sql_query(self)
+
+def parse_rows(fmt, context, results):
+ if fmt == consts.SQL_ARRAY_WITH_TRAILER:
+ rows = results['results']
+ elif fmt == consts.SQL_ARRAY:
+ rows = results
+ else:
+ return results
+ if not context.get('headers', False):
+ return rows
+ header_size = 1
+ if context.get('sqlTypesHeader', False):
+ header_size += 1
+ if context.get('typesHeader', False):
+ header_size += 1
+ return rows[header_size:]
+
+def label_non_null_cols(results):
+ if results is None or len(results) == 0:
+ return []
+ is_null = {}
+ for key in results[0].keys():
+ is_null[key] = True
+ for row in results:
+ for key, value in row.items():
+ if type(value) == str:
+ if value != '':
+ is_null[key] = False
+ elif type(value) == float:
+ if value != 0.0:
+ is_null[key] = False
+ elif value is not None:
+ is_null[key] = False
+ return is_null
+
+def filter_null_cols(results):
+ '''
+ Filter columns from a Druid result set by removing all null-like
+ columns. A column is considered null if all values for that column
+ are null. A value is null if it is either a JSON null, an empty
+ string, or a numeric 0. All rows are preserved, as is the order
+ of the remaining columns.
+ '''
+ if results is None or len(results) == 0:
+ return results
+ is_null = label_non_null_cols(results)
+ revised = []
+ for row in results:
+ new_row = {}
+ for key, value in row.items():
+ if is_null[key]:
+ continue
+ new_row[key] = value
+ revised.append(new_row)
+ return revised
+
+def parse_object_schema(results):
+ schema = []
+ if len(results) == 0:
+ return schema
+ row = results[0]
+ for k, v in row.items():
+ druid_type = None
+ sql_type = None
+ if type(v) is str:
+ druid_type = consts.DRUID_STRING_TYPE
+ sql_type = consts.SQL_VARCHAR_TYPE
+ elif type(v) is int or type(v) is float:
+ druid_type = consts.DRUID_LONG_TYPE
+ sql_type = consts.SQL_BIGINT_TYPE
+ schema.append(ColumnSchema(k, sql_type, druid_type))
+ return schema
+
+def parse_array_schema(context, results):
+ schema = []
+ if len(results) == 0:
+ return schema
+ has_headers = context.get(consts.HEADERS_KEY, False)
+ if not has_headers:
+ return schema
+ has_sql_types = context.get(consts.SQL_TYPES_HEADERS_KEY, False)
+ has_druid_types = context.get(consts.DRUID_TYPE_HEADERS_KEY, False)
+ size = len(results[0])
+ for i in range(size):
+ druid_type = None
+ if has_druid_types:
+ druid_type = results[1][i]
+ sql_type = None
+ if has_sql_types:
+ sql_type = results[2][i]
+ schema.append(ColumnSchema(results[0][i], sql_type, druid_type))
+ return schema
+
+def parse_schema(fmt, context, results):
+ if fmt == consts.SQL_OBJECT:
+ return parse_object_schema(results)
+ elif fmt == consts.SQL_ARRAY or fmt == consts.SQL_ARRAY_WITH_TRAILER:
+ return parse_array_schema(context, results)
+ else:
+ return []
+
+def is_response_ok(http_response):
+ code = http_response.status_code
+ return code == requests.codes.ok or code == requests.codes.accepted
+
+class ColumnSchema:
+
+ def __init__(self, name, sql_type, druid_type):
+ self.name = name
+ self.sql_type = sql_type
+ self.druid_type = druid_type
+
+ def __str__(self):
+ return "{{name={}, SQL type={}, Druid type={}}}".format(self.name,
self.sql_type, self.druid_type)
+
+class SqlQueryResult:
+ """
+ Defines the core protocol for Druid SQL queries.
+ """
+
+ def __init__(self, request, response):
+ self.http_response = response
+ self._json = None
+ self._rows = None
+ self._schema = None
+ self.request = request
+ self._error = None
+ self._id = None
+ if not is_response_ok(response):
+ try:
+ self._error = response.json()
+ except Exception:
+ self._error = response.text
+ if self._error is None or len(self._error) == 0:
+ self._error = "Failed with HTTP status
{}".format(response.status_code)
+ try:
+ self._id = self.http_response.headers['X-Druid-SQL-Query-Id']
+ except KeyError:
+ self._error = "Query returned no query ID"
+
+ def result_format(self):
+ return self.request.result_format()
+
+ def ok(self):
+ """
+ Reports if the query succeeded.
Review Comment:
The `SqlQueryResult` for a "regular" SQL query, which is request/response.
In this case, `ok()` means we didn't get an error, thus we _did_ get data.
`QueryTaskResult` is for MSQ, which is async. So, we have to first wait for
a result, then see if the task completed. If the task completed, we can get the
results. Those results can be rows, if the query turned out to be a `SELECT`
query. The problem is, from the perspective of this API, there is no way to
tell an ingest query from a "query query". So, for the task case, the rows are
available only if the result is `ok()` and the query was one that returns data.
I'm trusting the user to know which kind of query they issued, and when it
makes sense to ask for results. This results, in the present version, are in
the task reports.
This is why we should build a client in parallel with designing the API: it
avoids the need for silly explanations of the kind I just provided.
Ideally, the sync and async results would share code. I once had a version
that worked that way. But, the differences between "classic" and MSQ are such
that only a few method signatures were the same. Since Python has duck typing,
it turned out to be simpler to just have two separate response classes, with
similar methods where it makes sense. This was one of those places.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]