This is an automated email from the ASF dual-hosted git repository.
vogievetsky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c010e42988c feat: Add "Convert to SQL" option for streaming
supervisors (#19547)
c010e42988c is described below
commit c010e42988cba553218488912dd87a1f758935e3
Author: Kyle Hoondert <[email protected]>
AuthorDate: Tue Jun 23 22:43:02 2026 +0100
feat: Add "Convert to SQL" option for streaming supervisors (#19547)
* Add supervisor-to-SQL dialog
* Added tests
* feat: address review feedback on supervisor-to-SQL conversion
- Preserve native column types in the EXTERN signature so numeric metric
and typed dimension fields are not declared as strings
- Apply the supervisor's segment granularity to PARTITIONED BY and query
granularity via TIME_FLOOR (also used in GROUP BY)
- Preserve the supervisor's inputFormat settings, overriding only the type
- Escape custom timestamp formats with the query-toolkit literal helper
- Clear stale specs in the paste-mode dialog so Generate SQL can't submit
a hidden supervisor
- Open the converted query in a new workbench tab instead of overwriting
the active tab
- Add tests for the new behaviors and update snapshots
Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
* refactor: address review feedback on supervisor-to-SQL conversion
Reuse the existing ingestion-spec converter rather than reimplementing it:
- convertSupervisorToSql now rewrites the supervisor into an index_parallel
spec (file inputSource/inputFormat, default segment granularity, leading
dimension clustering) and delegates to convertSpecToSql, so column types,
granularity, timestamp parsing, and metric aggregation are all shared
- Reuse the IngestionSpec interface instead of a bespoke SupervisorSpec;
drop
the duplicated MetricSpec interface and metric-to-SQL helpers (~400 lines)
- Convert the conversion tests to inline snapshots
- Dialog: use IngestionSpec, fix Blueprint 5 scss namespace (.#{$bp-ns}),
replace the native select with a Button + Menu dropdown matching the
console style, and clarify the SQL is a one-time batch (not streaming)
- Don't auto-select the first supervisor; the button shows "Select
supervisor" until one is chosen
- Remove the single-export index.ts barrel
Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
* style: run prettier on supervisor-to-SQL files
Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
---------
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
Co-authored-by: Kyle Hoondert <[email protected]>
---
web-console/src/dialogs/index.ts | 1 +
.../supervisor-to-sql-dialog.scss | 44 +++
.../supervisor-to-sql-dialog.spec.tsx | 109 ++++++
.../supervisor-to-sql-dialog.tsx | 312 +++++++++++++++++
.../src/helpers/supervisor-conversion.spec.ts | 369 +++++++++++++++++++++
web-console/src/helpers/supervisor-conversion.ts | 106 ++++++
.../src/views/workbench-view/workbench-view.tsx | 37 ++-
7 files changed, 977 insertions(+), 1 deletion(-)
diff --git a/web-console/src/dialogs/index.ts b/web-console/src/dialogs/index.ts
index 00212ce566d..abe14cd36b8 100644
--- a/web-console/src/dialogs/index.ts
+++ b/web-console/src/dialogs/index.ts
@@ -39,6 +39,7 @@ export * from './string-input-dialog/string-input-dialog';
export * from
'./supervisor-reset-offsets-dialog/supervisor-reset-offsets-dialog';
export * from
'./supervisor-reset-to-latest-dialog/supervisor-reset-to-latest-dialog';
export * from
'./supervisor-table-action-dialog/supervisor-table-action-dialog';
+export * from './supervisor-to-sql-dialog/supervisor-to-sql-dialog';
export * from './table-action-dialog/table-action-dialog';
export * from './task-group-handoff-dialog/task-group-handoff-dialog';
export * from './task-table-action-dialog/task-table-action-dialog';
diff --git
a/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.scss
b/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.scss
new file mode 100644
index 00000000000..f483781fcf6
--- /dev/null
+++
b/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.scss
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@import '../../variables';
+
+.supervisor-to-sql-dialog {
+ width: 650px;
+
+ .error-message {
+ padding: 10px;
+ background-color: rgba(219, 55, 55, 0.15);
+ border-left: 3px solid #db3737;
+ border-radius: 3px;
+ color: #ff7373;
+ }
+
+ .#{$bp-ns}-form-group {
+ margin-bottom: 20px;
+ }
+
+ .#{$bp-ns}-radio-group {
+ margin-top: 5px;
+ }
+
+ .#{$bp-ns}-text-area {
+ font-family: monospace;
+ font-size: 12px;
+ }
+}
diff --git
a/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.spec.tsx
b/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.spec.tsx
new file mode 100644
index 00000000000..243e7e4cd9c
--- /dev/null
+++
b/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.spec.tsx
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { render } from '@testing-library/react';
+import React from 'react';
+
+import { Api } from '../../singletons';
+
+import { SupervisorToSqlDialog } from './supervisor-to-sql-dialog';
+
+jest.mock('../../singletons', () => ({
+ Api: {
+ instance: {
+ get: jest.fn(),
+ encodePath: jest.fn((path: string) => encodeURIComponent(path)),
+ },
+ },
+}));
+
+describe('SupervisorToSqlDialog', () => {
+ const mockOnConvert = jest.fn();
+ const mockOnClose = jest.fn();
+
+ const mockSupervisorList = ['wikipedia-kafka', 'clickstream-kafka'];
+
+ const mockSupervisorSpec = {
+ type: 'kafka',
+ spec: {
+ dataSchema: {
+ dataSource: 'wikipedia',
+ timestampSpec: {
+ column: 'timestamp',
+ format: 'iso',
+ },
+ dimensionsSpec: {
+ dimensions: [
+ 'channel',
+ 'user',
+ { name: 'page', type: 'string' },
+ { name: 'namespace', type: 'string' },
+ ],
+ },
+ metricsSpec: [
+ { name: 'count', type: 'count' },
+ { name: 'added', type: 'longSum', fieldName: 'added' },
+ ],
+ },
+ ioConfig: {
+ topic: 'wikipedia',
+ inputSource: {
+ type: 's3',
+ uris: ['s3://my-bucket/wikipedia/data/'],
+ },
+ },
+ },
+ };
+
+ beforeEach(() => {
+ jest.clearAllMocks();
+ (Api.instance.get as jest.Mock).mockImplementation((url: string) => {
+ if (url === '/druid/indexer/v1/supervisor') {
+ return Promise.resolve({ data: mockSupervisorList });
+ }
+ if (url.includes('/druid/indexer/v1/supervisor/')) {
+ return Promise.resolve({ data: mockSupervisorSpec });
+ }
+ return Promise.reject(new Error('Unknown endpoint'));
+ });
+ });
+
+ it('renders the dialog', () => {
+ const { container } = render(
+ React.createElement(SupervisorToSqlDialog, {
+ onConvert: mockOnConvert,
+ onClose: mockOnClose,
+ }),
+ );
+ expect(container).toBeTruthy();
+ });
+
+ it('calls onClose when Close button is clicked', () => {
+ const { getByText } = render(
+ React.createElement(SupervisorToSqlDialog, {
+ onConvert: mockOnConvert,
+ onClose: mockOnClose,
+ }),
+ );
+
+ const closeButton = getByText('Close');
+ closeButton.click();
+
+ expect(mockOnClose).toHaveBeenCalled();
+ });
+});
diff --git
a/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.tsx
b/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.tsx
new file mode 100644
index 00000000000..461f8c7f998
--- /dev/null
+++
b/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.tsx
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import {
+ Button,
+ Classes,
+ Dialog,
+ FormGroup,
+ InputGroup,
+ Intent,
+ Menu,
+ MenuItem,
+ Popover,
+ Position,
+ Radio,
+ RadioGroup,
+ TextArea,
+} from '@blueprintjs/core';
+import { IconNames } from '@blueprintjs/icons';
+import React, { useState } from 'react';
+
+import { ExternalLink } from '../../components';
+import type { IngestionSpec, QueryWithContext } from '../../druid-models';
+import { convertSupervisorToSql } from '../../helpers/supervisor-conversion';
+import { Api, AppToaster } from '../../singletons';
+import { deepGet, tickIcon } from '../../utils';
+
+import './supervisor-to-sql-dialog.scss';
+
+export interface SupervisorToSqlDialogProps {
+ onConvert(converted: QueryWithContext, datasource?: string): void;
+ onClose(): void;
+}
+
+export const SupervisorToSqlDialog = React.memo(function SupervisorToSqlDialog(
+ props: SupervisorToSqlDialogProps,
+) {
+ const { onConvert, onClose } = props;
+
+ const [supervisorSource, setSupervisorSource] = useState<'select' |
'paste'>('select');
+ const [selectedSupervisor, setSelectedSupervisor] = useState<string>('');
+ const [pastedSupervisor, setPastedSupervisor] = useState<string>('');
+ const [availableSupervisors, setAvailableSupervisors] =
useState<string[]>([]);
+ const [supervisorSpec, setSupervisorSpec] = useState<IngestionSpec |
undefined>();
+
+ const [fileLocation, setFileLocation] = useState<string>('');
+ const [fileType, setFileType] = useState<string>('json');
+
+ const [loading, setLoading] = useState(false);
+ const [error, setError] = useState<string | undefined>();
+
+ React.useEffect(() => {
+ void loadSupervisors();
+ }, []);
+
+ async function loadSupervisors() {
+ try {
+ const supervisors = await
Api.instance.get<string[]>('/druid/indexer/v1/supervisor');
+ // Don't auto-select; leave the button showing "Select supervisor" until
the user picks one
+ setAvailableSupervisors(supervisors.data);
+ } catch (e) {
+ setError(`Failed to load supervisors: ${e.message}`);
+ }
+ }
+
+ async function loadSupervisorSpec(supervisorId: string) {
+ if (!supervisorId) return;
+
+ setLoading(true);
+ setError(undefined);
+
+ try {
+ const resp = await Api.instance.get<IngestionSpec>(
+ `/druid/indexer/v1/supervisor/${Api.encodePath(supervisorId)}`,
+ );
+ setSupervisorSpec(resp.data);
+
+ // Auto-populate file location from ioConfig if available
+ const ioConfig = deepGet(resp.data, 'spec.ioConfig');
+ if (ioConfig?.inputSource?.uris) {
+ setFileLocation(ioConfig.inputSource.uris[0] || '');
+ } else if (ioConfig?.inputSource?.baseDir) {
+ setFileLocation(ioConfig.inputSource.baseDir);
+ }
+ } catch (e) {
+ setError(`Failed to load supervisor spec: ${e.message}`);
+ } finally {
+ setLoading(false);
+ }
+ }
+
+ function parsePastedSupervisor() {
+ if (!pastedSupervisor.trim()) {
+ // Clear any previously parsed spec so a blank/cleared paste can't
submit a stale supervisor
+ setSupervisorSpec(undefined);
+ setError(undefined);
+ return;
+ }
+
+ try {
+ const parsed = JSON.parse(pastedSupervisor);
+ setSupervisorSpec(parsed);
+ setError(undefined);
+
+ // Auto-populate file location from ioConfig if available
+ const ioConfig = deepGet(parsed, 'spec.ioConfig');
+ if (ioConfig?.inputSource?.uris) {
+ setFileLocation(ioConfig.inputSource.uris[0] || '');
+ } else if (ioConfig?.inputSource?.baseDir) {
+ setFileLocation(ioConfig.inputSource.baseDir);
+ }
+ } catch (e) {
+ setError(`Invalid JSON: ${e.message}`);
+ setSupervisorSpec(undefined);
+ }
+ }
+
+ function handleConvert() {
+ if (!supervisorSpec) {
+ AppToaster.show({
+ message: 'No supervisor spec loaded',
+ intent: Intent.DANGER,
+ });
+ return;
+ }
+
+ if (!fileLocation) {
+ AppToaster.show({
+ message: 'Please specify a file location',
+ intent: Intent.DANGER,
+ });
+ return;
+ }
+
+ let converted: QueryWithContext;
+ try {
+ converted = convertSupervisorToSql(supervisorSpec, {
+ fileLocation,
+ fileType,
+ });
+ } catch (e) {
+ AppToaster.show({
+ message: `Could not convert supervisor: ${e.message}`,
+ intent: Intent.DANGER,
+ });
+ return;
+ }
+
+ AppToaster.show({
+ message: 'Supervisor converted to SQL, please review',
+ intent: Intent.SUCCESS,
+ });
+
+ onConvert(converted, deepGet(supervisorSpec,
'spec.dataSchema.dataSource'));
+ }
+
+ React.useEffect(() => {
+ if (supervisorSource !== 'select') return;
+ if (selectedSupervisor) {
+ void loadSupervisorSpec(selectedSupervisor);
+ } else {
+ // No supervisor selected (e.g. none available); don't keep a spec from
paste mode around
+ setSupervisorSpec(undefined);
+ }
+ }, [selectedSupervisor, supervisorSource]);
+
+ React.useEffect(() => {
+ if (supervisorSource !== 'paste') return;
+ // Always reparse on entering paste mode or editing the text so a stale
select-mode spec is
+ // dropped and a cleared paste disables Generate SQL
+ parsePastedSupervisor();
+ }, [pastedSupervisor, supervisorSource]);
+
+ return (
+ <Dialog
+ className="supervisor-to-sql-dialog"
+ isOpen
+ onClose={onClose}
+ title="Convert supervisor to SQL"
+ canOutsideClickClose={false}
+ >
+ <div className={Classes.DIALOG_BODY}>
+ <p>
+ Convert a streaming supervisor specification into an{' '}
+ <ExternalLink
href="https://druid.apache.org/docs/latest/multi-stage-query/">
+ MSQ (Multi-Stage Query)
+ </ExternalLink>{' '}
+ ingestion SQL statement. This generates a one-time batch ingestion
that reads the supplied
+ files — it does not start a streaming ingestion and will not
continuously ingest new data.
+ </p>
+
+ <FormGroup label="Supervisor source">
+ <RadioGroup
+ selectedValue={supervisorSource}
+ onChange={e => setSupervisorSource(e.currentTarget.value as
'select' | 'paste')}
+ >
+ <Radio label="Select existing supervisor" value="select" />
+ <Radio label="Paste supervisor JSON" value="paste" />
+ </RadioGroup>
+ </FormGroup>
+
+ {supervisorSource === 'select' ? (
+ <FormGroup label="Select supervisor">
+ <Popover
+ position={Position.BOTTOM_LEFT}
+ disabled={!availableSupervisors.length}
+ content={
+ <Menu>
+ {availableSupervisors.map(name => (
+ <MenuItem
+ key={name}
+ icon={tickIcon(name === selectedSupervisor)}
+ text={name}
+ onClick={() => setSelectedSupervisor(name)}
+ />
+ ))}
+ </Menu>
+ }
+ >
+ <Button
+ text={selectedSupervisor || 'Select supervisor'}
+ rightIcon={IconNames.CARET_DOWN}
+ disabled={!availableSupervisors.length}
+ />
+ </Popover>
+ </FormGroup>
+ ) : (
+ <FormGroup
+ label="Supervisor JSON"
+ helperText="Paste the complete supervisor specification"
+ >
+ <TextArea
+ value={pastedSupervisor}
+ onChange={e => setPastedSupervisor(e.target.value)}
+ fill
+ rows={10}
+ placeholder='{"type": "kafka", "spec": {...}}'
+ />
+ </FormGroup>
+ )}
+
+ <FormGroup
+ label="File location"
+ helperText="S3 URI, local path, or other supported input source"
+ >
+ <InputGroup
+ value={fileLocation}
+ onChange={e => setFileLocation(e.target.value)}
+ placeholder="s3://my-bucket/path/to/files/"
+ fill
+ />
+ </FormGroup>
+
+ <FormGroup label="File type">
+ <RadioGroup
+ selectedValue={fileType}
+ onChange={e => setFileType(e.currentTarget.value)}
+ inline
+ >
+ <Radio label="JSON" value="json" />
+ <Radio label="CSV" value="csv" />
+ <Radio label="Parquet" value="parquet" />
+ <Radio label="ORC" value="orc" />
+ </RadioGroup>
+ </FormGroup>
+
+ {error && (
+ <FormGroup>
+ <div className="error-message">{error}</div>
+ </FormGroup>
+ )}
+
+ {!supervisorSpec && !loading && (
+ <FormGroup>
+ <div style={{ color: '#999', fontSize: '12px', fontStyle: 'italic'
}}>
+ {supervisorSource === 'select'
+ ? 'Select a supervisor to continue...'
+ : 'Paste a supervisor JSON to continue...'}
+ </div>
+ </FormGroup>
+ )}
+ </div>
+ <div className={Classes.DIALOG_FOOTER}>
+ <div className={Classes.DIALOG_FOOTER_ACTIONS}>
+ <Button text="Close" onClick={onClose} />
+ <Button
+ text="Generate SQL"
+ intent={Intent.PRIMARY}
+ onClick={handleConvert}
+ disabled={!supervisorSpec || !fileLocation || loading}
+ icon={IconNames.CODE}
+ />
+ </div>
+ </div>
+ </Dialog>
+ );
+});
diff --git a/web-console/src/helpers/supervisor-conversion.spec.ts
b/web-console/src/helpers/supervisor-conversion.spec.ts
new file mode 100644
index 00000000000..2c94db3ec33
--- /dev/null
+++ b/web-console/src/helpers/supervisor-conversion.spec.ts
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import type { IngestionSpec } from '../druid-models';
+
+import { convertSupervisorToSql } from './supervisor-conversion';
+
+expect.addSnapshotSerializer({
+ test: val => typeof val === 'string',
+ print: String,
+});
+
+function wikipediaSupervisor(): IngestionSpec {
+ return {
+ type: 'kafka',
+ spec: {
+ dataSchema: {
+ dataSource: 'wikipedia',
+ timestampSpec: {
+ column: 'timestamp',
+ format: 'iso',
+ },
+ dimensionsSpec: {
+ dimensions: [
+ 'channel',
+ 'user',
+ { name: 'page', type: 'string' },
+ { name: 'namespace', type: 'string' },
+ ],
+ },
+ metricsSpec: [
+ { name: 'count', type: 'count' },
+ { name: 'sum_added', type: 'longSum', fieldName: 'added' },
+ ],
+ },
+ ioConfig: {
+ topic: 'wikipedia',
+ inputSource: {
+ type: 's3',
+ uris: ['s3://my-bucket/wikipedia/data/'],
+ },
+ },
+ },
+ } as IngestionSpec;
+}
+
+describe('supervisor conversion', () => {
+ describe('convertSupervisorToSql', () => {
+ it('converts a supervisor with dimensions and metrics (rollup -> GROUP
BY)', () => {
+ const converted = convertSupervisorToSql(wikipediaSupervisor(), {
+ fileLocation: 's3://my-bucket/wikipedia/data/',
+ fileType: 'json',
+ });
+
+ expect(converted.queryString).toMatchInlineSnapshot(`
+ -- This SQL query was auto generated from an ingestion spec
+ SET arrayIngestMode = 'array';
+ SET finalizeAggregations = FALSE;
+ SET groupByEnableMultiValueUnnesting = FALSE;
+ REPLACE INTO "wikipedia" OVERWRITE ALL
+ WITH "source" AS (SELECT * FROM TABLE(
+ EXTERN(
+
'{"type":"s3","uris":["s3://my-bucket/wikipedia/data/"],"objectGlob":"**.json"}',
+ '{"type":"json"}'
+ )
+ ) EXTEND ("timestamp" VARCHAR, "channel" VARCHAR, "user" VARCHAR,
"page" VARCHAR, "namespace" VARCHAR, "added" BIGINT))
+ SELECT
+ TIME_PARSE("timestamp") AS "__time",
+ "channel",
+ "user",
+ "page",
+ "namespace",
+ COUNT(*) AS "count",
+ SUM("added") AS "sum_added"
+ FROM "source"
+ GROUP BY 1, 2, 3, 4, 5
+ PARTITIONED BY DAY
+ CLUSTERED BY "channel", "user"
+ `);
+ });
+
+ it('does not GROUP BY when rollup is disabled', () => {
+ const supervisor = wikipediaSupervisor();
+ supervisor.spec.dataSchema.granularitySpec = { rollup: false };
+
+ const converted = convertSupervisorToSql(supervisor, {
+ fileLocation: 's3://my-bucket/wikipedia/data/',
+ fileType: 'json',
+ });
+
+ expect(converted.queryString).not.toContain('GROUP BY');
+ });
+
+ it('declares numeric metric and dimension columns with their native
types', () => {
+ const supervisor = wikipediaSupervisor();
+ supervisor.spec.dataSchema.dimensionsSpec = {
+ dimensions: ['channel', { name: 'delta', type: 'long' }, { name:
'ratio', type: 'double' }],
+ };
+ supervisor.spec.dataSchema.metricsSpec = [
+ { name: 'sum_added', type: 'longSum', fieldName: 'added' },
+ { name: 'unique_user', type: 'thetaSketch', fieldName: 'user' },
+ ];
+
+ const converted = convertSupervisorToSql(supervisor, {
+ fileLocation: 's3://my-bucket/wikipedia/data/',
+ fileType: 'json',
+ });
+
+ // The EXTERN signature carries native types, not all VARCHAR
+ expect(converted.queryString).toContain('"delta" BIGINT');
+ expect(converted.queryString).toContain('"ratio" DOUBLE');
+ expect(converted.queryString).toContain('"added" BIGINT');
+ // Sketch metrics read from a string input column
+ expect(converted.queryString).toContain('"user" VARCHAR');
+ });
+
+ it('converts the full range of supported metric aggregations', () => {
+ const supervisor = wikipediaSupervisor();
+ supervisor.spec.dataSchema.metricsSpec = [
+ { name: 'count', type: 'count' },
+ { name: 'sum_added', type: 'longSum', fieldName: 'added' },
+ { name: 'min_added', type: 'doubleMin', fieldName: 'added' },
+ { name: 'max_added', type: 'floatMax', fieldName: 'added' },
+ { name: 'first_added', type: 'longFirst', fieldName: 'added' },
+ { name: 'last_added', type: 'doubleLast', fieldName: 'added' },
+ { name: 'first_page', type: 'stringFirst', fieldName: 'page' },
+ { name: 'last_page', type: 'stringLast', fieldName: 'page' },
+ { name: 'theta_user', type: 'thetaSketch', fieldName: 'user' },
+ { name: 'hll_user', type: 'HLLSketchBuild', fieldName: 'user' },
+ { name: 'quantiles_added', type: 'quantilesDoublesSketch', fieldName:
'added' },
+ { name: 'unique_user', type: 'hyperUnique', fieldName: 'user' },
+ ];
+
+ const converted = convertSupervisorToSql(supervisor, {
+ fileLocation: 's3://my-bucket/wikipedia/data/',
+ fileType: 'json',
+ });
+
+ expect(converted.queryString).toMatchInlineSnapshot(`
+ -- This SQL query was auto generated from an ingestion spec
+ SET arrayIngestMode = 'array';
+ SET finalizeAggregations = FALSE;
+ SET groupByEnableMultiValueUnnesting = FALSE;
+ REPLACE INTO "wikipedia" OVERWRITE ALL
+ WITH "source" AS (SELECT * FROM TABLE(
+ EXTERN(
+
'{"type":"s3","uris":["s3://my-bucket/wikipedia/data/"],"objectGlob":"**.json"}',
+ '{"type":"json"}'
+ )
+ ) EXTEND ("timestamp" VARCHAR, "channel" VARCHAR, "user" VARCHAR,
"page" VARCHAR, "namespace" VARCHAR, "added" BIGINT))
+ SELECT
+ TIME_PARSE("timestamp") AS "__time",
+ "channel",
+ "user",
+ "page",
+ "namespace",
+ COUNT(*) AS "count",
+ SUM("added") AS "sum_added",
+ MIN("added") AS "min_added",
+ MAX("added") AS "max_added",
+ EARLIEST("added") AS "first_added",
+ LATEST("added") AS "last_added",
+ EARLIEST("page", 128) AS "first_page",
+ LATEST("page", 128) AS "last_page",
+ APPROX_COUNT_DISTINCT_DS_THETA("user") AS "theta_user",
+ APPROX_COUNT_DISTINCT_DS_HLL("user") AS "hll_user",
+ DS_QUANTILES_SKETCH("added") AS "quantiles_added",
+ APPROX_COUNT_DISTINCT_BUILTIN("user") AS "unique_user"
+ FROM "source"
+ GROUP BY 1, 2, 3, 4, 5
+ PARTITIONED BY DAY
+ CLUSTERED BY "channel", "user"
+ `);
+ });
+
+ it('includes non-default sketch arguments', () => {
+ const supervisor = wikipediaSupervisor();
+ supervisor.spec.dataSchema.metricsSpec = [
+ { name: 'theta_user', type: 'thetaSketch', fieldName: 'user', size:
32768 } as any,
+ {
+ name: 'hll_user',
+ type: 'HLLSketchBuild',
+ fieldName: 'user',
+ lgK: 14,
+ tgtHllType: 'HLL_8',
+ } as any,
+ {
+ name: 'quantiles_added',
+ type: 'quantilesDoublesSketch',
+ fieldName: 'added',
+ k: 256,
+ } as any,
+ { name: 'first_page', type: 'stringFirst', fieldName: 'page',
maxStringBytes: 1024 } as any,
+ ];
+
+ const converted = convertSupervisorToSql(supervisor, {
+ fileLocation: 's3://my-bucket/wikipedia/data/',
+ fileType: 'json',
+ });
+
+
expect(converted.queryString).toContain('APPROX_COUNT_DISTINCT_DS_THETA("user",
32768)');
+
expect(converted.queryString).toContain(`APPROX_COUNT_DISTINCT_DS_HLL("user",
14, 'HLL_8')`);
+ expect(converted.queryString).toContain('DS_QUANTILES_SKETCH("added",
256)');
+ expect(converted.queryString).toContain('EARLIEST("page", 1024)');
+ });
+
+ it('emits a comment for unsupported metrics and metrics missing a
fieldName', () => {
+ const supervisor = wikipediaSupervisor();
+ supervisor.spec.dataSchema.metricsSpec = [
+ { name: 'sum_added', type: 'longSum', fieldName: 'added' },
+ { name: 'unsupported', type: 'tDigestSketch', fieldName: 'added' },
+ { name: 'no_field', type: 'longSum' },
+ ];
+
+ const converted = convertSupervisorToSql(supervisor, {
+ fileLocation: 's3://my-bucket/wikipedia/data/',
+ fileType: 'json',
+ });
+
+ expect(converted.queryString).toContain('SUM("added") AS "sum_added"');
+ expect(converted.queryString).toContain('could not convert metric');
+ });
+
+ it('adds objectGlob for an s3 directory location', () => {
+ const converted = convertSupervisorToSql(wikipediaSupervisor(), {
+ fileLocation: 's3://my-bucket/wikipedia/data/',
+ fileType: 'parquet',
+ });
+
+ expect(converted.queryString).toContain('objectGlob');
+ expect(converted.queryString).toContain('**.parquet');
+ });
+
+ it('does not add objectGlob for a single s3 file location', () => {
+ const converted = convertSupervisorToSql(wikipediaSupervisor(), {
+ fileLocation: 's3://my-bucket/wikipedia/data.json',
+ fileType: 'json',
+ });
+
+ expect(converted.queryString).not.toContain('objectGlob');
+ });
+
+ it('builds a google input source for a gs:// location', () => {
+ const converted = convertSupervisorToSql(wikipediaSupervisor(), {
+ fileLocation: 'gs://my-bucket/wikipedia/data/',
+ fileType: 'json',
+ });
+
+ expect(converted.queryString).toContain('"type":"google"');
+ });
+
+ it('builds an http input source for an http(s) location', () => {
+ const converted = convertSupervisorToSql(wikipediaSupervisor(), {
+ fileLocation: 'https://example.com/wikipedia/data.json',
+ fileType: 'json',
+ });
+
+ expect(converted.queryString).toContain('"type":"http"');
+ });
+
+ it('builds a local input source and strips the file:// prefix', () => {
+ const converted = convertSupervisorToSql(wikipediaSupervisor(), {
+ fileLocation: 'file:///var/data/wikipedia/',
+ fileType: 'csv',
+ });
+
+ expect(converted.queryString).toContain('"type":"local"');
+ expect(converted.queryString).toContain('/var/data/wikipedia/');
+ expect(converted.queryString).not.toContain('file://');
+ expect(converted.queryString).toContain('*.csv');
+ });
+
+ it('preserves the supervisor input format settings, overriding only the
type', () => {
+ const supervisor = wikipediaSupervisor();
+ (supervisor.spec.ioConfig as any).inputFormat = {
+ type: 'kafka',
+ valueFormat: { type: 'json', flattenSpec: { fields: [{ name: 'x',
expr: '$.x' }] } },
+ };
+
+ const converted = convertSupervisorToSql(supervisor, {
+ fileLocation: 's3://my-bucket/wikipedia/data/',
+ fileType: 'csv',
+ });
+
+ expect(converted.queryString).toContain('flattenSpec');
+ expect(converted.queryString).toContain('"type":"csv"');
+ expect(converted.queryString).not.toContain('"type":"kafka"');
+ });
+
+ it('applies the supervisor segment and query granularity', () => {
+ const supervisor = wikipediaSupervisor();
+ supervisor.spec.dataSchema.granularitySpec = {
+ segmentGranularity: 'HOUR',
+ queryGranularity: 'hour',
+ };
+
+ const converted = convertSupervisorToSql(supervisor, {
+ fileLocation: 's3://my-bucket/wikipedia/data/',
+ fileType: 'json',
+ });
+
+ expect(converted.queryString).toContain('PARTITIONED BY HOUR');
+
expect(converted.queryString).toContain(`TIME_FLOOR(TIME_PARSE("timestamp"),
'PT1H')`);
+ });
+
+ it('defaults to PARTITIONED BY DAY when no segment granularity is set', ()
=> {
+ const converted = convertSupervisorToSql(wikipediaSupervisor(), {
+ fileLocation: 's3://my-bucket/wikipedia/data/',
+ fileType: 'json',
+ });
+
+ expect(converted.queryString).toContain('PARTITIONED BY DAY');
+ });
+
+ it('clusters by at most the first two dimensions', () => {
+ const supervisor = wikipediaSupervisor();
+ supervisor.spec.dataSchema.dimensionsSpec = {
+ dimensions: ['a', 'b', 'c', 'd'],
+ };
+
+ const converted = convertSupervisorToSql(supervisor, {
+ fileLocation: 's3://my-bucket/wikipedia/data/',
+ fileType: 'json',
+ });
+
+ expect(converted.queryString).toContain('CLUSTERED BY "a", "b"');
+ const clusteredBy = converted.queryString.slice(
+ converted.queryString.indexOf('CLUSTERED BY'),
+ );
+ expect(clusteredBy).not.toContain('"c"');
+ expect(clusteredBy).not.toContain('"d"');
+ });
+
+ it('omits CLUSTERED BY when there are no dimensions', () => {
+ const supervisor = wikipediaSupervisor();
+ supervisor.spec.dataSchema.dimensionsSpec = { dimensions: [] };
+
+ const converted = convertSupervisorToSql(supervisor, {
+ fileLocation: 's3://my-bucket/wikipedia/data/',
+ fileType: 'json',
+ });
+
+ expect(converted.queryString).not.toContain('CLUSTERED BY');
+ });
+
+ it('throws when dataSchema is missing', () => {
+ const supervisor = { type: 'kafka', spec: {} } as unknown as
IngestionSpec;
+
+ expect(() =>
+ convertSupervisorToSql(supervisor, { fileLocation: 's3://x/',
fileType: 'json' }),
+ ).toThrow('Supervisor spec missing dataSchema');
+ });
+ });
+});
diff --git a/web-console/src/helpers/supervisor-conversion.ts
b/web-console/src/helpers/supervisor-conversion.ts
new file mode 100644
index 00000000000..a471ed1d65d
--- /dev/null
+++ b/web-console/src/helpers/supervisor-conversion.ts
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import type { IngestionSpec, InputFormat, InputSource, QueryWithContext } from
'../druid-models';
+import { deepGet } from '../utils';
+
+import { convertSpecToSql } from './spec-conversion';
+
+export interface SupervisorConversionOptions {
+ fileLocation: string;
+ fileType: string;
+}
+
+function fileLocationToInputSource(fileLocation: string, fileType: string):
InputSource {
+ if (fileLocation.startsWith('s3://')) {
+ const inputSource: InputSource = { type: 's3', uris: [fileLocation] };
+ // Add an objectGlob when pointing at a directory so only the chosen file
type is read
+ if (fileLocation.endsWith('/')) {
+ inputSource.objectGlob = `**.${fileType}`;
+ }
+ return inputSource;
+ }
+
+ if (fileLocation.startsWith('gs://')) {
+ return { type: 'google', uris: [fileLocation] };
+ }
+
+ if (fileLocation.startsWith('http://') ||
fileLocation.startsWith('https://')) {
+ return { type: 'http', uris: [fileLocation] };
+ }
+
+ // Default to local for file:// or absolute paths
+ return { type: 'local', baseDir: fileLocation.replace('file://', ''),
filter: `*.${fileType}` };
+}
+
+/**
+ * Converts a streaming supervisor spec to an MSQ ingestion SQL statement.
+ *
+ * A supervisor's dataSchema is identical in shape to a batch ingestion
spec's; only the ioConfig
+ * differs (a stream rather than files). So rather than reimplement the
conversion, we rewrite the
+ * supervisor into an `index_parallel` spec that reads from the chosen files
and delegate to the
+ * existing {@link convertSpecToSql}. This reuses all of its column-type,
timestamp, granularity, and
+ * metric aggregation handling.
+ */
+export function convertSupervisorToSql(
+ supervisorSpec: IngestionSpec,
+ options: SupervisorConversionOptions,
+): QueryWithContext {
+ const { fileLocation, fileType } = options;
+
+ const dataSchema = deepGet(supervisorSpec, 'spec.dataSchema');
+ if (!dataSchema) {
+ throw new Error('Supervisor spec missing dataSchema');
+ }
+
+ const inputSource = fileLocationToInputSource(fileLocation, fileType);
+
+ // Preserve the supervisor's inputFormat settings (e.g. flattenSpec, CSV
columns/header), overriding
+ // only the type with the file type chosen for the backfill files.
+ const supervisorInputFormat: InputFormat | undefined = deepGet(
+ supervisorSpec,
+ 'spec.ioConfig.inputFormat',
+ );
+ const inputFormat: InputFormat = supervisorInputFormat
+ ? { ...supervisorInputFormat, type: fileType }
+ : { type: fileType };
+
+ // convertSpecToSql requires a segment granularity; default to DAY when the
supervisor omits one.
+ const granularitySpec = { segmentGranularity: 'DAY',
...dataSchema.granularitySpec };
+
+ // Cluster by the leading dimensions, approximating the partitioning a
supervisor would produce.
+ const dimensionNames: string[] = (deepGet(dataSchema,
'dimensionsSpec.dimensions') || []).map(
+ (d: any) => (typeof d === 'string' ? d : d.name),
+ );
+
+ const batchSpec: IngestionSpec = {
+ type: 'index_parallel',
+ spec: {
+ dataSchema: { ...dataSchema, granularitySpec },
+ ioConfig: { type: 'index_parallel', inputSource, inputFormat },
+ tuningConfig: {
+ type: 'index_parallel',
+ ...(dimensionNames.length
+ ? { partitionsSpec: { type: 'range', partitionDimensions:
dimensionNames.slice(0, 2) } }
+ : {}),
+ },
+ },
+ };
+
+ return convertSpecToSql(batchSpec);
+}
diff --git a/web-console/src/views/workbench-view/workbench-view.tsx
b/web-console/src/views/workbench-view/workbench-view.tsx
index 1b61be3d4eb..5590fa3192e 100644
--- a/web-console/src/views/workbench-view/workbench-view.tsx
+++ b/web-console/src/views/workbench-view/workbench-view.tsx
@@ -33,7 +33,7 @@ import { SqlExpression } from 'druid-query-toolkit';
import React from 'react';
import { MenuCheckbox, SplitterLayout } from '../../components';
-import { SpecDialog, StringInputDialog } from '../../dialogs';
+import { SpecDialog, StringInputDialog, SupervisorToSqlDialog } from
'../../dialogs';
import type {
CapacityInfo,
ConsoleViewId,
@@ -114,6 +114,7 @@ type MoreMenuItem =
| 'history'
| 'prettify'
| 'convert-ingestion-to-sql'
+ | 'convert-supervisor-to-sql'
| 'attach-tab-from-task-id'
| 'open-query-detail-archive'
| 'druid-sql-documentation'
@@ -161,6 +162,7 @@ export interface WorkbenchViewState {
explainDialogOpen: boolean;
historyDialogOpen: boolean;
specDialogOpen: boolean;
+ supervisorToSqlDialogOpen: boolean;
executionSubmitDialogOpen: boolean;
taskIdSubmitDialogOpen: boolean;
renamingTab?: TabEntry;
@@ -218,6 +220,7 @@ export class WorkbenchView extends
React.PureComponent<WorkbenchViewProps, Workb
explainDialogOpen: false,
historyDialogOpen: false,
specDialogOpen: false,
+ supervisorToSqlDialogOpen: false,
executionSubmitDialogOpen: false,
taskIdSubmitDialogOpen: false,
@@ -280,6 +283,10 @@ export class WorkbenchView extends
React.PureComponent<WorkbenchViewProps, Workb
this.setState({ specDialogOpen: true });
};
+ private readonly openSupervisorToSqlDialog = () => {
+ this.setState({ supervisorToSqlDialogOpen: true });
+ };
+
private readonly openExecutionSubmitDialog = () => {
this.setState({ executionSubmitDialogOpen: true });
};
@@ -496,6 +503,26 @@ export class WorkbenchView extends
React.PureComponent<WorkbenchViewProps, Workb
);
}
+ private renderSupervisorToSqlDialog() {
+ const { supervisorToSqlDialogOpen } = this.state;
+ if (!supervisorToSqlDialogOpen) return;
+
+ return (
+ <SupervisorToSqlDialog
+ onConvert={(converted, datasource) => {
+ this.handleNewTab(
+ WorkbenchQuery.blank()
+ .changeQueryString(converted.queryString)
+ .changeQueryContext(converted.queryContext || {}),
+ `Convert ${datasource || 'supervisor'}`,
+ );
+ this.setState({ supervisorToSqlDialogOpen: false });
+ }}
+ onClose={() => this.setState({ supervisorToSqlDialogOpen: false })}
+ />
+ );
+ }
+
private renderExecutionSubmit() {
const { executionSubmitDialogOpen } = this.state;
if (!executionSubmitDialogOpen) return;
@@ -844,6 +871,13 @@ export class WorkbenchView extends
React.PureComponent<WorkbenchViewProps, Workb
onClick={this.openSpecDialog}
/>
)}
+ {!hiddenMoreMenuItems.includes('convert-supervisor-to-sql')
&& (
+ <MenuItem
+ icon={IconNames.EXCHANGE}
+ text="Convert supervisor to SQL"
+ onClick={this.openSupervisorToSqlDialog}
+ />
+ )}
{!hiddenMoreMenuItems.includes('attach-tab-from-task-id') &&
(
<MenuItem
icon={IconNames.DOCUMENT_OPEN}
@@ -1008,6 +1042,7 @@ export class WorkbenchView extends
React.PureComponent<WorkbenchViewProps, Workb
{this.renderConnectExternalDataDialog()}
{this.renderTabRenameDialog()}
{this.renderSpecDialog()}
+ {this.renderSupervisorToSqlDialog()}
{this.renderExecutionSubmit()}
{this.renderTaskIdSubmit()}
<MetadataChangeDetector onChange={() =>
this.metadataQueryManager?.runQuery(null)} />
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]