This is an automated email from the ASF dual-hosted git repository.

vogievetsky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c010e42988c feat: Add "Convert to SQL" option for streaming 
supervisors (#19547)
c010e42988c is described below

commit c010e42988cba553218488912dd87a1f758935e3
Author: Kyle Hoondert <[email protected]>
AuthorDate: Tue Jun 23 22:43:02 2026 +0100

    feat: Add "Convert to SQL" option for streaming supervisors (#19547)
    
    * Add supervisor-to-SQL dialog
    
    * Added tests
    
    * feat: address review feedback on supervisor-to-SQL conversion
    
    - Preserve native column types in the EXTERN signature so numeric metric
      and typed dimension fields are not declared as strings
    - Apply the supervisor's segment granularity to PARTITIONED BY and query
      granularity via TIME_FLOOR (also used in GROUP BY)
    - Preserve the supervisor's inputFormat settings, overriding only the type
    - Escape custom timestamp formats with the query-toolkit literal helper
    - Clear stale specs in the paste-mode dialog so Generate SQL can't submit
      a hidden supervisor
    - Open the converted query in a new workbench tab instead of overwriting
      the active tab
    - Add tests for the new behaviors and update snapshots
    
    Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
    
    * refactor: address review feedback on supervisor-to-SQL conversion
    
    Reuse the existing ingestion-spec converter rather than reimplementing it:
    - convertSupervisorToSql now rewrites the supervisor into an index_parallel
      spec (file inputSource/inputFormat, default segment granularity, leading
      dimension clustering) and delegates to convertSpecToSql, so column types,
      granularity, timestamp parsing, and metric aggregation are all shared
    - Reuse the IngestionSpec interface instead of a bespoke SupervisorSpec; 
drop
      the duplicated MetricSpec interface and metric-to-SQL helpers (~400 lines)
    - Convert the conversion tests to inline snapshots
    - Dialog: use IngestionSpec, fix Blueprint 5 scss namespace (.#{$bp-ns}),
      replace the native select with a Button + Menu dropdown matching the
      console style, and clarify the SQL is a one-time batch (not streaming)
    - Don't auto-select the first supervisor; the button shows "Select
      supervisor" until one is chosen
    - Remove the single-export index.ts barrel
    
    Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
    
    * style: run prettier on supervisor-to-SQL files
    
    Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
    Co-authored-by: Kyle Hoondert <[email protected]>
---
 web-console/src/dialogs/index.ts                   |   1 +
 .../supervisor-to-sql-dialog.scss                  |  44 +++
 .../supervisor-to-sql-dialog.spec.tsx              | 109 ++++++
 .../supervisor-to-sql-dialog.tsx                   | 312 +++++++++++++++++
 .../src/helpers/supervisor-conversion.spec.ts      | 369 +++++++++++++++++++++
 web-console/src/helpers/supervisor-conversion.ts   | 106 ++++++
 .../src/views/workbench-view/workbench-view.tsx    |  37 ++-
 7 files changed, 977 insertions(+), 1 deletion(-)

diff --git a/web-console/src/dialogs/index.ts b/web-console/src/dialogs/index.ts
index 00212ce566d..abe14cd36b8 100644
--- a/web-console/src/dialogs/index.ts
+++ b/web-console/src/dialogs/index.ts
@@ -39,6 +39,7 @@ export * from './string-input-dialog/string-input-dialog';
 export * from 
'./supervisor-reset-offsets-dialog/supervisor-reset-offsets-dialog';
 export * from 
'./supervisor-reset-to-latest-dialog/supervisor-reset-to-latest-dialog';
 export * from 
'./supervisor-table-action-dialog/supervisor-table-action-dialog';
+export * from './supervisor-to-sql-dialog/supervisor-to-sql-dialog';
 export * from './table-action-dialog/table-action-dialog';
 export * from './task-group-handoff-dialog/task-group-handoff-dialog';
 export * from './task-table-action-dialog/task-table-action-dialog';
diff --git 
a/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.scss
 
b/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.scss
new file mode 100644
index 00000000000..f483781fcf6
--- /dev/null
+++ 
b/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.scss
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@import '../../variables';
+
+.supervisor-to-sql-dialog {
+  width: 650px;
+
+  .error-message {
+    padding: 10px;
+    background-color: rgba(219, 55, 55, 0.15);
+    border-left: 3px solid #db3737;
+    border-radius: 3px;
+    color: #ff7373;
+  }
+
+  .#{$bp-ns}-form-group {
+    margin-bottom: 20px;
+  }
+
+  .#{$bp-ns}-radio-group {
+    margin-top: 5px;
+  }
+
+  .#{$bp-ns}-text-area {
+    font-family: monospace;
+    font-size: 12px;
+  }
+}
diff --git 
a/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.spec.tsx
 
b/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.spec.tsx
new file mode 100644
index 00000000000..243e7e4cd9c
--- /dev/null
+++ 
b/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.spec.tsx
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { render } from '@testing-library/react';
+import React from 'react';
+
+import { Api } from '../../singletons';
+
+import { SupervisorToSqlDialog } from './supervisor-to-sql-dialog';
+
+jest.mock('../../singletons', () => ({
+  Api: {
+    instance: {
+      get: jest.fn(),
+      encodePath: jest.fn((path: string) => encodeURIComponent(path)),
+    },
+  },
+}));
+
+describe('SupervisorToSqlDialog', () => {
+  const mockOnConvert = jest.fn();
+  const mockOnClose = jest.fn();
+
+  const mockSupervisorList = ['wikipedia-kafka', 'clickstream-kafka'];
+
+  const mockSupervisorSpec = {
+    type: 'kafka',
+    spec: {
+      dataSchema: {
+        dataSource: 'wikipedia',
+        timestampSpec: {
+          column: 'timestamp',
+          format: 'iso',
+        },
+        dimensionsSpec: {
+          dimensions: [
+            'channel',
+            'user',
+            { name: 'page', type: 'string' },
+            { name: 'namespace', type: 'string' },
+          ],
+        },
+        metricsSpec: [
+          { name: 'count', type: 'count' },
+          { name: 'added', type: 'longSum', fieldName: 'added' },
+        ],
+      },
+      ioConfig: {
+        topic: 'wikipedia',
+        inputSource: {
+          type: 's3',
+          uris: ['s3://my-bucket/wikipedia/data/'],
+        },
+      },
+    },
+  };
+
+  beforeEach(() => {
+    jest.clearAllMocks();
+    (Api.instance.get as jest.Mock).mockImplementation((url: string) => {
+      if (url === '/druid/indexer/v1/supervisor') {
+        return Promise.resolve({ data: mockSupervisorList });
+      }
+      if (url.includes('/druid/indexer/v1/supervisor/')) {
+        return Promise.resolve({ data: mockSupervisorSpec });
+      }
+      return Promise.reject(new Error('Unknown endpoint'));
+    });
+  });
+
+  it('renders the dialog', () => {
+    const { container } = render(
+      React.createElement(SupervisorToSqlDialog, {
+        onConvert: mockOnConvert,
+        onClose: mockOnClose,
+      }),
+    );
+    expect(container).toBeTruthy();
+  });
+
+  it('calls onClose when Close button is clicked', () => {
+    const { getByText } = render(
+      React.createElement(SupervisorToSqlDialog, {
+        onConvert: mockOnConvert,
+        onClose: mockOnClose,
+      }),
+    );
+
+    const closeButton = getByText('Close');
+    closeButton.click();
+
+    expect(mockOnClose).toHaveBeenCalled();
+  });
+});
diff --git 
a/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.tsx 
b/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.tsx
new file mode 100644
index 00000000000..461f8c7f998
--- /dev/null
+++ 
b/web-console/src/dialogs/supervisor-to-sql-dialog/supervisor-to-sql-dialog.tsx
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import {
+  Button,
+  Classes,
+  Dialog,
+  FormGroup,
+  InputGroup,
+  Intent,
+  Menu,
+  MenuItem,
+  Popover,
+  Position,
+  Radio,
+  RadioGroup,
+  TextArea,
+} from '@blueprintjs/core';
+import { IconNames } from '@blueprintjs/icons';
+import React, { useState } from 'react';
+
+import { ExternalLink } from '../../components';
+import type { IngestionSpec, QueryWithContext } from '../../druid-models';
+import { convertSupervisorToSql } from '../../helpers/supervisor-conversion';
+import { Api, AppToaster } from '../../singletons';
+import { deepGet, tickIcon } from '../../utils';
+
+import './supervisor-to-sql-dialog.scss';
+
+export interface SupervisorToSqlDialogProps {
+  onConvert(converted: QueryWithContext, datasource?: string): void;
+  onClose(): void;
+}
+
+export const SupervisorToSqlDialog = React.memo(function SupervisorToSqlDialog(
+  props: SupervisorToSqlDialogProps,
+) {
+  const { onConvert, onClose } = props;
+
+  const [supervisorSource, setSupervisorSource] = useState<'select' | 
'paste'>('select');
+  const [selectedSupervisor, setSelectedSupervisor] = useState<string>('');
+  const [pastedSupervisor, setPastedSupervisor] = useState<string>('');
+  const [availableSupervisors, setAvailableSupervisors] = 
useState<string[]>([]);
+  const [supervisorSpec, setSupervisorSpec] = useState<IngestionSpec | 
undefined>();
+
+  const [fileLocation, setFileLocation] = useState<string>('');
+  const [fileType, setFileType] = useState<string>('json');
+
+  const [loading, setLoading] = useState(false);
+  const [error, setError] = useState<string | undefined>();
+
+  React.useEffect(() => {
+    void loadSupervisors();
+  }, []);
+
+  async function loadSupervisors() {
+    try {
+      const supervisors = await 
Api.instance.get<string[]>('/druid/indexer/v1/supervisor');
+      // Don't auto-select; leave the button showing "Select supervisor" until 
the user picks one
+      setAvailableSupervisors(supervisors.data);
+    } catch (e) {
+      setError(`Failed to load supervisors: ${e.message}`);
+    }
+  }
+
+  async function loadSupervisorSpec(supervisorId: string) {
+    if (!supervisorId) return;
+
+    setLoading(true);
+    setError(undefined);
+
+    try {
+      const resp = await Api.instance.get<IngestionSpec>(
+        `/druid/indexer/v1/supervisor/${Api.encodePath(supervisorId)}`,
+      );
+      setSupervisorSpec(resp.data);
+
+      // Auto-populate file location from ioConfig if available
+      const ioConfig = deepGet(resp.data, 'spec.ioConfig');
+      if (ioConfig?.inputSource?.uris) {
+        setFileLocation(ioConfig.inputSource.uris[0] || '');
+      } else if (ioConfig?.inputSource?.baseDir) {
+        setFileLocation(ioConfig.inputSource.baseDir);
+      }
+    } catch (e) {
+      setError(`Failed to load supervisor spec: ${e.message}`);
+    } finally {
+      setLoading(false);
+    }
+  }
+
+  function parsePastedSupervisor() {
+    if (!pastedSupervisor.trim()) {
+      // Clear any previously parsed spec so a blank/cleared paste can't 
submit a stale supervisor
+      setSupervisorSpec(undefined);
+      setError(undefined);
+      return;
+    }
+
+    try {
+      const parsed = JSON.parse(pastedSupervisor);
+      setSupervisorSpec(parsed);
+      setError(undefined);
+
+      // Auto-populate file location from ioConfig if available
+      const ioConfig = deepGet(parsed, 'spec.ioConfig');
+      if (ioConfig?.inputSource?.uris) {
+        setFileLocation(ioConfig.inputSource.uris[0] || '');
+      } else if (ioConfig?.inputSource?.baseDir) {
+        setFileLocation(ioConfig.inputSource.baseDir);
+      }
+    } catch (e) {
+      setError(`Invalid JSON: ${e.message}`);
+      setSupervisorSpec(undefined);
+    }
+  }
+
+  function handleConvert() {
+    if (!supervisorSpec) {
+      AppToaster.show({
+        message: 'No supervisor spec loaded',
+        intent: Intent.DANGER,
+      });
+      return;
+    }
+
+    if (!fileLocation) {
+      AppToaster.show({
+        message: 'Please specify a file location',
+        intent: Intent.DANGER,
+      });
+      return;
+    }
+
+    let converted: QueryWithContext;
+    try {
+      converted = convertSupervisorToSql(supervisorSpec, {
+        fileLocation,
+        fileType,
+      });
+    } catch (e) {
+      AppToaster.show({
+        message: `Could not convert supervisor: ${e.message}`,
+        intent: Intent.DANGER,
+      });
+      return;
+    }
+
+    AppToaster.show({
+      message: 'Supervisor converted to SQL, please review',
+      intent: Intent.SUCCESS,
+    });
+
+    onConvert(converted, deepGet(supervisorSpec, 
'spec.dataSchema.dataSource'));
+  }
+
+  React.useEffect(() => {
+    if (supervisorSource !== 'select') return;
+    if (selectedSupervisor) {
+      void loadSupervisorSpec(selectedSupervisor);
+    } else {
+      // No supervisor selected (e.g. none available); don't keep a spec from 
paste mode around
+      setSupervisorSpec(undefined);
+    }
+  }, [selectedSupervisor, supervisorSource]);
+
+  React.useEffect(() => {
+    if (supervisorSource !== 'paste') return;
+    // Always reparse on entering paste mode or editing the text so a stale 
select-mode spec is
+    // dropped and a cleared paste disables Generate SQL
+    parsePastedSupervisor();
+  }, [pastedSupervisor, supervisorSource]);
+
+  return (
+    <Dialog
+      className="supervisor-to-sql-dialog"
+      isOpen
+      onClose={onClose}
+      title="Convert supervisor to SQL"
+      canOutsideClickClose={false}
+    >
+      <div className={Classes.DIALOG_BODY}>
+        <p>
+          Convert a streaming supervisor specification into an{' '}
+          <ExternalLink 
href="https://druid.apache.org/docs/latest/multi-stage-query/";>
+            MSQ (Multi-Stage Query)
+          </ExternalLink>{' '}
+          ingestion SQL statement. This generates a one-time batch ingestion 
that reads the supplied
+          files — it does not start a streaming ingestion and will not 
continuously ingest new data.
+        </p>
+
+        <FormGroup label="Supervisor source">
+          <RadioGroup
+            selectedValue={supervisorSource}
+            onChange={e => setSupervisorSource(e.currentTarget.value as 
'select' | 'paste')}
+          >
+            <Radio label="Select existing supervisor" value="select" />
+            <Radio label="Paste supervisor JSON" value="paste" />
+          </RadioGroup>
+        </FormGroup>
+
+        {supervisorSource === 'select' ? (
+          <FormGroup label="Select supervisor">
+            <Popover
+              position={Position.BOTTOM_LEFT}
+              disabled={!availableSupervisors.length}
+              content={
+                <Menu>
+                  {availableSupervisors.map(name => (
+                    <MenuItem
+                      key={name}
+                      icon={tickIcon(name === selectedSupervisor)}
+                      text={name}
+                      onClick={() => setSelectedSupervisor(name)}
+                    />
+                  ))}
+                </Menu>
+              }
+            >
+              <Button
+                text={selectedSupervisor || 'Select supervisor'}
+                rightIcon={IconNames.CARET_DOWN}
+                disabled={!availableSupervisors.length}
+              />
+            </Popover>
+          </FormGroup>
+        ) : (
+          <FormGroup
+            label="Supervisor JSON"
+            helperText="Paste the complete supervisor specification"
+          >
+            <TextArea
+              value={pastedSupervisor}
+              onChange={e => setPastedSupervisor(e.target.value)}
+              fill
+              rows={10}
+              placeholder='{"type": "kafka", "spec": {...}}'
+            />
+          </FormGroup>
+        )}
+
+        <FormGroup
+          label="File location"
+          helperText="S3 URI, local path, or other supported input source"
+        >
+          <InputGroup
+            value={fileLocation}
+            onChange={e => setFileLocation(e.target.value)}
+            placeholder="s3://my-bucket/path/to/files/"
+            fill
+          />
+        </FormGroup>
+
+        <FormGroup label="File type">
+          <RadioGroup
+            selectedValue={fileType}
+            onChange={e => setFileType(e.currentTarget.value)}
+            inline
+          >
+            <Radio label="JSON" value="json" />
+            <Radio label="CSV" value="csv" />
+            <Radio label="Parquet" value="parquet" />
+            <Radio label="ORC" value="orc" />
+          </RadioGroup>
+        </FormGroup>
+
+        {error && (
+          <FormGroup>
+            <div className="error-message">{error}</div>
+          </FormGroup>
+        )}
+
+        {!supervisorSpec && !loading && (
+          <FormGroup>
+            <div style={{ color: '#999', fontSize: '12px', fontStyle: 'italic' 
}}>
+              {supervisorSource === 'select'
+                ? 'Select a supervisor to continue...'
+                : 'Paste a supervisor JSON to continue...'}
+            </div>
+          </FormGroup>
+        )}
+      </div>
+      <div className={Classes.DIALOG_FOOTER}>
+        <div className={Classes.DIALOG_FOOTER_ACTIONS}>
+          <Button text="Close" onClick={onClose} />
+          <Button
+            text="Generate SQL"
+            intent={Intent.PRIMARY}
+            onClick={handleConvert}
+            disabled={!supervisorSpec || !fileLocation || loading}
+            icon={IconNames.CODE}
+          />
+        </div>
+      </div>
+    </Dialog>
+  );
+});
diff --git a/web-console/src/helpers/supervisor-conversion.spec.ts 
b/web-console/src/helpers/supervisor-conversion.spec.ts
new file mode 100644
index 00000000000..2c94db3ec33
--- /dev/null
+++ b/web-console/src/helpers/supervisor-conversion.spec.ts
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import type { IngestionSpec } from '../druid-models';
+
+import { convertSupervisorToSql } from './supervisor-conversion';
+
+expect.addSnapshotSerializer({
+  test: val => typeof val === 'string',
+  print: String,
+});
+
+function wikipediaSupervisor(): IngestionSpec {
+  return {
+    type: 'kafka',
+    spec: {
+      dataSchema: {
+        dataSource: 'wikipedia',
+        timestampSpec: {
+          column: 'timestamp',
+          format: 'iso',
+        },
+        dimensionsSpec: {
+          dimensions: [
+            'channel',
+            'user',
+            { name: 'page', type: 'string' },
+            { name: 'namespace', type: 'string' },
+          ],
+        },
+        metricsSpec: [
+          { name: 'count', type: 'count' },
+          { name: 'sum_added', type: 'longSum', fieldName: 'added' },
+        ],
+      },
+      ioConfig: {
+        topic: 'wikipedia',
+        inputSource: {
+          type: 's3',
+          uris: ['s3://my-bucket/wikipedia/data/'],
+        },
+      },
+    },
+  } as IngestionSpec;
+}
+
+describe('supervisor conversion', () => {
+  describe('convertSupervisorToSql', () => {
+    it('converts a supervisor with dimensions and metrics (rollup -> GROUP 
BY)', () => {
+      const converted = convertSupervisorToSql(wikipediaSupervisor(), {
+        fileLocation: 's3://my-bucket/wikipedia/data/',
+        fileType: 'json',
+      });
+
+      expect(converted.queryString).toMatchInlineSnapshot(`
+        -- This SQL query was auto generated from an ingestion spec
+        SET arrayIngestMode = 'array';
+        SET finalizeAggregations = FALSE;
+        SET groupByEnableMultiValueUnnesting = FALSE;
+        REPLACE INTO "wikipedia" OVERWRITE ALL
+        WITH "source" AS (SELECT * FROM TABLE(
+          EXTERN(
+            
'{"type":"s3","uris":["s3://my-bucket/wikipedia/data/"],"objectGlob":"**.json"}',
+            '{"type":"json"}'
+          )
+        ) EXTEND ("timestamp" VARCHAR, "channel" VARCHAR, "user" VARCHAR, 
"page" VARCHAR, "namespace" VARCHAR, "added" BIGINT))
+        SELECT
+          TIME_PARSE("timestamp") AS "__time",
+          "channel",
+          "user",
+          "page",
+          "namespace",
+          COUNT(*) AS "count",
+          SUM("added") AS "sum_added"
+        FROM "source"
+        GROUP BY 1, 2, 3, 4, 5
+        PARTITIONED BY DAY
+        CLUSTERED BY "channel", "user"
+      `);
+    });
+
+    it('does not GROUP BY when rollup is disabled', () => {
+      const supervisor = wikipediaSupervisor();
+      supervisor.spec.dataSchema.granularitySpec = { rollup: false };
+
+      const converted = convertSupervisorToSql(supervisor, {
+        fileLocation: 's3://my-bucket/wikipedia/data/',
+        fileType: 'json',
+      });
+
+      expect(converted.queryString).not.toContain('GROUP BY');
+    });
+
+    it('declares numeric metric and dimension columns with their native 
types', () => {
+      const supervisor = wikipediaSupervisor();
+      supervisor.spec.dataSchema.dimensionsSpec = {
+        dimensions: ['channel', { name: 'delta', type: 'long' }, { name: 
'ratio', type: 'double' }],
+      };
+      supervisor.spec.dataSchema.metricsSpec = [
+        { name: 'sum_added', type: 'longSum', fieldName: 'added' },
+        { name: 'unique_user', type: 'thetaSketch', fieldName: 'user' },
+      ];
+
+      const converted = convertSupervisorToSql(supervisor, {
+        fileLocation: 's3://my-bucket/wikipedia/data/',
+        fileType: 'json',
+      });
+
+      // The EXTERN signature carries native types, not all VARCHAR
+      expect(converted.queryString).toContain('"delta" BIGINT');
+      expect(converted.queryString).toContain('"ratio" DOUBLE');
+      expect(converted.queryString).toContain('"added" BIGINT');
+      // Sketch metrics read from a string input column
+      expect(converted.queryString).toContain('"user" VARCHAR');
+    });
+
+    it('converts the full range of supported metric aggregations', () => {
+      const supervisor = wikipediaSupervisor();
+      supervisor.spec.dataSchema.metricsSpec = [
+        { name: 'count', type: 'count' },
+        { name: 'sum_added', type: 'longSum', fieldName: 'added' },
+        { name: 'min_added', type: 'doubleMin', fieldName: 'added' },
+        { name: 'max_added', type: 'floatMax', fieldName: 'added' },
+        { name: 'first_added', type: 'longFirst', fieldName: 'added' },
+        { name: 'last_added', type: 'doubleLast', fieldName: 'added' },
+        { name: 'first_page', type: 'stringFirst', fieldName: 'page' },
+        { name: 'last_page', type: 'stringLast', fieldName: 'page' },
+        { name: 'theta_user', type: 'thetaSketch', fieldName: 'user' },
+        { name: 'hll_user', type: 'HLLSketchBuild', fieldName: 'user' },
+        { name: 'quantiles_added', type: 'quantilesDoublesSketch', fieldName: 
'added' },
+        { name: 'unique_user', type: 'hyperUnique', fieldName: 'user' },
+      ];
+
+      const converted = convertSupervisorToSql(supervisor, {
+        fileLocation: 's3://my-bucket/wikipedia/data/',
+        fileType: 'json',
+      });
+
+      expect(converted.queryString).toMatchInlineSnapshot(`
+        -- This SQL query was auto generated from an ingestion spec
+        SET arrayIngestMode = 'array';
+        SET finalizeAggregations = FALSE;
+        SET groupByEnableMultiValueUnnesting = FALSE;
+        REPLACE INTO "wikipedia" OVERWRITE ALL
+        WITH "source" AS (SELECT * FROM TABLE(
+          EXTERN(
+            
'{"type":"s3","uris":["s3://my-bucket/wikipedia/data/"],"objectGlob":"**.json"}',
+            '{"type":"json"}'
+          )
+        ) EXTEND ("timestamp" VARCHAR, "channel" VARCHAR, "user" VARCHAR, 
"page" VARCHAR, "namespace" VARCHAR, "added" BIGINT))
+        SELECT
+          TIME_PARSE("timestamp") AS "__time",
+          "channel",
+          "user",
+          "page",
+          "namespace",
+          COUNT(*) AS "count",
+          SUM("added") AS "sum_added",
+          MIN("added") AS "min_added",
+          MAX("added") AS "max_added",
+          EARLIEST("added") AS "first_added",
+          LATEST("added") AS "last_added",
+          EARLIEST("page", 128) AS "first_page",
+          LATEST("page", 128) AS "last_page",
+          APPROX_COUNT_DISTINCT_DS_THETA("user") AS "theta_user",
+          APPROX_COUNT_DISTINCT_DS_HLL("user") AS "hll_user",
+          DS_QUANTILES_SKETCH("added") AS "quantiles_added",
+          APPROX_COUNT_DISTINCT_BUILTIN("user") AS "unique_user"
+        FROM "source"
+        GROUP BY 1, 2, 3, 4, 5
+        PARTITIONED BY DAY
+        CLUSTERED BY "channel", "user"
+      `);
+    });
+
+    it('includes non-default sketch arguments', () => {
+      const supervisor = wikipediaSupervisor();
+      supervisor.spec.dataSchema.metricsSpec = [
+        { name: 'theta_user', type: 'thetaSketch', fieldName: 'user', size: 
32768 } as any,
+        {
+          name: 'hll_user',
+          type: 'HLLSketchBuild',
+          fieldName: 'user',
+          lgK: 14,
+          tgtHllType: 'HLL_8',
+        } as any,
+        {
+          name: 'quantiles_added',
+          type: 'quantilesDoublesSketch',
+          fieldName: 'added',
+          k: 256,
+        } as any,
+        { name: 'first_page', type: 'stringFirst', fieldName: 'page', 
maxStringBytes: 1024 } as any,
+      ];
+
+      const converted = convertSupervisorToSql(supervisor, {
+        fileLocation: 's3://my-bucket/wikipedia/data/',
+        fileType: 'json',
+      });
+
+      
expect(converted.queryString).toContain('APPROX_COUNT_DISTINCT_DS_THETA("user", 
32768)');
+      
expect(converted.queryString).toContain(`APPROX_COUNT_DISTINCT_DS_HLL("user", 
14, 'HLL_8')`);
+      expect(converted.queryString).toContain('DS_QUANTILES_SKETCH("added", 
256)');
+      expect(converted.queryString).toContain('EARLIEST("page", 1024)');
+    });
+
+    it('emits a comment for unsupported metrics and metrics missing a 
fieldName', () => {
+      const supervisor = wikipediaSupervisor();
+      supervisor.spec.dataSchema.metricsSpec = [
+        { name: 'sum_added', type: 'longSum', fieldName: 'added' },
+        { name: 'unsupported', type: 'tDigestSketch', fieldName: 'added' },
+        { name: 'no_field', type: 'longSum' },
+      ];
+
+      const converted = convertSupervisorToSql(supervisor, {
+        fileLocation: 's3://my-bucket/wikipedia/data/',
+        fileType: 'json',
+      });
+
+      expect(converted.queryString).toContain('SUM("added") AS "sum_added"');
+      expect(converted.queryString).toContain('could not convert metric');
+    });
+
+    it('adds objectGlob for an s3 directory location', () => {
+      const converted = convertSupervisorToSql(wikipediaSupervisor(), {
+        fileLocation: 's3://my-bucket/wikipedia/data/',
+        fileType: 'parquet',
+      });
+
+      expect(converted.queryString).toContain('objectGlob');
+      expect(converted.queryString).toContain('**.parquet');
+    });
+
+    it('does not add objectGlob for a single s3 file location', () => {
+      const converted = convertSupervisorToSql(wikipediaSupervisor(), {
+        fileLocation: 's3://my-bucket/wikipedia/data.json',
+        fileType: 'json',
+      });
+
+      expect(converted.queryString).not.toContain('objectGlob');
+    });
+
+    it('builds a google input source for a gs:// location', () => {
+      const converted = convertSupervisorToSql(wikipediaSupervisor(), {
+        fileLocation: 'gs://my-bucket/wikipedia/data/',
+        fileType: 'json',
+      });
+
+      expect(converted.queryString).toContain('"type":"google"');
+    });
+
+    it('builds an http input source for an http(s) location', () => {
+      const converted = convertSupervisorToSql(wikipediaSupervisor(), {
+        fileLocation: 'https://example.com/wikipedia/data.json',
+        fileType: 'json',
+      });
+
+      expect(converted.queryString).toContain('"type":"http"');
+    });
+
+    it('builds a local input source and strips the file:// prefix', () => {
+      const converted = convertSupervisorToSql(wikipediaSupervisor(), {
+        fileLocation: 'file:///var/data/wikipedia/',
+        fileType: 'csv',
+      });
+
+      expect(converted.queryString).toContain('"type":"local"');
+      expect(converted.queryString).toContain('/var/data/wikipedia/');
+      expect(converted.queryString).not.toContain('file://');
+      expect(converted.queryString).toContain('*.csv');
+    });
+
+    it('preserves the supervisor input format settings, overriding only the 
type', () => {
+      const supervisor = wikipediaSupervisor();
+      (supervisor.spec.ioConfig as any).inputFormat = {
+        type: 'kafka',
+        valueFormat: { type: 'json', flattenSpec: { fields: [{ name: 'x', 
expr: '$.x' }] } },
+      };
+
+      const converted = convertSupervisorToSql(supervisor, {
+        fileLocation: 's3://my-bucket/wikipedia/data/',
+        fileType: 'csv',
+      });
+
+      expect(converted.queryString).toContain('flattenSpec');
+      expect(converted.queryString).toContain('"type":"csv"');
+      expect(converted.queryString).not.toContain('"type":"kafka"');
+    });
+
+    it('applies the supervisor segment and query granularity', () => {
+      const supervisor = wikipediaSupervisor();
+      supervisor.spec.dataSchema.granularitySpec = {
+        segmentGranularity: 'HOUR',
+        queryGranularity: 'hour',
+      };
+
+      const converted = convertSupervisorToSql(supervisor, {
+        fileLocation: 's3://my-bucket/wikipedia/data/',
+        fileType: 'json',
+      });
+
+      expect(converted.queryString).toContain('PARTITIONED BY HOUR');
+      
expect(converted.queryString).toContain(`TIME_FLOOR(TIME_PARSE("timestamp"), 
'PT1H')`);
+    });
+
+    it('defaults to PARTITIONED BY DAY when no segment granularity is set', () 
=> {
+      const converted = convertSupervisorToSql(wikipediaSupervisor(), {
+        fileLocation: 's3://my-bucket/wikipedia/data/',
+        fileType: 'json',
+      });
+
+      expect(converted.queryString).toContain('PARTITIONED BY DAY');
+    });
+
+    it('clusters by at most the first two dimensions', () => {
+      const supervisor = wikipediaSupervisor();
+      supervisor.spec.dataSchema.dimensionsSpec = {
+        dimensions: ['a', 'b', 'c', 'd'],
+      };
+
+      const converted = convertSupervisorToSql(supervisor, {
+        fileLocation: 's3://my-bucket/wikipedia/data/',
+        fileType: 'json',
+      });
+
+      expect(converted.queryString).toContain('CLUSTERED BY "a", "b"');
+      const clusteredBy = converted.queryString.slice(
+        converted.queryString.indexOf('CLUSTERED BY'),
+      );
+      expect(clusteredBy).not.toContain('"c"');
+      expect(clusteredBy).not.toContain('"d"');
+    });
+
+    it('omits CLUSTERED BY when there are no dimensions', () => {
+      const supervisor = wikipediaSupervisor();
+      supervisor.spec.dataSchema.dimensionsSpec = { dimensions: [] };
+
+      const converted = convertSupervisorToSql(supervisor, {
+        fileLocation: 's3://my-bucket/wikipedia/data/',
+        fileType: 'json',
+      });
+
+      expect(converted.queryString).not.toContain('CLUSTERED BY');
+    });
+
+    it('throws when dataSchema is missing', () => {
+      const supervisor = { type: 'kafka', spec: {} } as unknown as 
IngestionSpec;
+
+      expect(() =>
+        convertSupervisorToSql(supervisor, { fileLocation: 's3://x/', 
fileType: 'json' }),
+      ).toThrow('Supervisor spec missing dataSchema');
+    });
+  });
+});
diff --git a/web-console/src/helpers/supervisor-conversion.ts 
b/web-console/src/helpers/supervisor-conversion.ts
new file mode 100644
index 00000000000..a471ed1d65d
--- /dev/null
+++ b/web-console/src/helpers/supervisor-conversion.ts
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import type { IngestionSpec, InputFormat, InputSource, QueryWithContext } from 
'../druid-models';
+import { deepGet } from '../utils';
+
+import { convertSpecToSql } from './spec-conversion';
+
+export interface SupervisorConversionOptions {
+  fileLocation: string;
+  fileType: string;
+}
+
+function fileLocationToInputSource(fileLocation: string, fileType: string): 
InputSource {
+  if (fileLocation.startsWith('s3://')) {
+    const inputSource: InputSource = { type: 's3', uris: [fileLocation] };
+    // Add an objectGlob when pointing at a directory so only the chosen file 
type is read
+    if (fileLocation.endsWith('/')) {
+      inputSource.objectGlob = `**.${fileType}`;
+    }
+    return inputSource;
+  }
+
+  if (fileLocation.startsWith('gs://')) {
+    return { type: 'google', uris: [fileLocation] };
+  }
+
+  if (fileLocation.startsWith('http://') || 
fileLocation.startsWith('https://')) {
+    return { type: 'http', uris: [fileLocation] };
+  }
+
+  // Default to local for file:// or absolute paths
+  return { type: 'local', baseDir: fileLocation.replace('file://', ''), 
filter: `*.${fileType}` };
+}
+
+/**
+ * Converts a streaming supervisor spec to an MSQ ingestion SQL statement.
+ *
+ * A supervisor's dataSchema is identical in shape to a batch ingestion 
spec's; only the ioConfig
+ * differs (a stream rather than files). So rather than reimplement the 
conversion, we rewrite the
+ * supervisor into an `index_parallel` spec that reads from the chosen files 
and delegate to the
+ * existing {@link convertSpecToSql}. This reuses all of its column-type, 
timestamp, granularity, and
+ * metric aggregation handling.
+ */
+export function convertSupervisorToSql(
+  supervisorSpec: IngestionSpec,
+  options: SupervisorConversionOptions,
+): QueryWithContext {
+  const { fileLocation, fileType } = options;
+
+  const dataSchema = deepGet(supervisorSpec, 'spec.dataSchema');
+  if (!dataSchema) {
+    throw new Error('Supervisor spec missing dataSchema');
+  }
+
+  const inputSource = fileLocationToInputSource(fileLocation, fileType);
+
+  // Preserve the supervisor's inputFormat settings (e.g. flattenSpec, CSV 
columns/header), overriding
+  // only the type with the file type chosen for the backfill files.
+  const supervisorInputFormat: InputFormat | undefined = deepGet(
+    supervisorSpec,
+    'spec.ioConfig.inputFormat',
+  );
+  const inputFormat: InputFormat = supervisorInputFormat
+    ? { ...supervisorInputFormat, type: fileType }
+    : { type: fileType };
+
+  // convertSpecToSql requires a segment granularity; default to DAY when the 
supervisor omits one.
+  const granularitySpec = { segmentGranularity: 'DAY', 
...dataSchema.granularitySpec };
+
+  // Cluster by the leading dimensions, approximating the partitioning a 
supervisor would produce.
+  const dimensionNames: string[] = (deepGet(dataSchema, 
'dimensionsSpec.dimensions') || []).map(
+    (d: any) => (typeof d === 'string' ? d : d.name),
+  );
+
+  const batchSpec: IngestionSpec = {
+    type: 'index_parallel',
+    spec: {
+      dataSchema: { ...dataSchema, granularitySpec },
+      ioConfig: { type: 'index_parallel', inputSource, inputFormat },
+      tuningConfig: {
+        type: 'index_parallel',
+        ...(dimensionNames.length
+          ? { partitionsSpec: { type: 'range', partitionDimensions: 
dimensionNames.slice(0, 2) } }
+          : {}),
+      },
+    },
+  };
+
+  return convertSpecToSql(batchSpec);
+}
diff --git a/web-console/src/views/workbench-view/workbench-view.tsx 
b/web-console/src/views/workbench-view/workbench-view.tsx
index 1b61be3d4eb..5590fa3192e 100644
--- a/web-console/src/views/workbench-view/workbench-view.tsx
+++ b/web-console/src/views/workbench-view/workbench-view.tsx
@@ -33,7 +33,7 @@ import { SqlExpression } from 'druid-query-toolkit';
 import React from 'react';
 
 import { MenuCheckbox, SplitterLayout } from '../../components';
-import { SpecDialog, StringInputDialog } from '../../dialogs';
+import { SpecDialog, StringInputDialog, SupervisorToSqlDialog } from 
'../../dialogs';
 import type {
   CapacityInfo,
   ConsoleViewId,
@@ -114,6 +114,7 @@ type MoreMenuItem =
   | 'history'
   | 'prettify'
   | 'convert-ingestion-to-sql'
+  | 'convert-supervisor-to-sql'
   | 'attach-tab-from-task-id'
   | 'open-query-detail-archive'
   | 'druid-sql-documentation'
@@ -161,6 +162,7 @@ export interface WorkbenchViewState {
   explainDialogOpen: boolean;
   historyDialogOpen: boolean;
   specDialogOpen: boolean;
+  supervisorToSqlDialogOpen: boolean;
   executionSubmitDialogOpen: boolean;
   taskIdSubmitDialogOpen: boolean;
   renamingTab?: TabEntry;
@@ -218,6 +220,7 @@ export class WorkbenchView extends 
React.PureComponent<WorkbenchViewProps, Workb
       explainDialogOpen: false,
       historyDialogOpen: false,
       specDialogOpen: false,
+      supervisorToSqlDialogOpen: false,
       executionSubmitDialogOpen: false,
       taskIdSubmitDialogOpen: false,
 
@@ -280,6 +283,10 @@ export class WorkbenchView extends 
React.PureComponent<WorkbenchViewProps, Workb
     this.setState({ specDialogOpen: true });
   };
 
+  private readonly openSupervisorToSqlDialog = () => {
+    this.setState({ supervisorToSqlDialogOpen: true });
+  };
+
   private readonly openExecutionSubmitDialog = () => {
     this.setState({ executionSubmitDialogOpen: true });
   };
@@ -496,6 +503,26 @@ export class WorkbenchView extends 
React.PureComponent<WorkbenchViewProps, Workb
     );
   }
 
+  private renderSupervisorToSqlDialog() {
+    const { supervisorToSqlDialogOpen } = this.state;
+    if (!supervisorToSqlDialogOpen) return;
+
+    return (
+      <SupervisorToSqlDialog
+        onConvert={(converted, datasource) => {
+          this.handleNewTab(
+            WorkbenchQuery.blank()
+              .changeQueryString(converted.queryString)
+              .changeQueryContext(converted.queryContext || {}),
+            `Convert ${datasource || 'supervisor'}`,
+          );
+          this.setState({ supervisorToSqlDialogOpen: false });
+        }}
+        onClose={() => this.setState({ supervisorToSqlDialogOpen: false })}
+      />
+    );
+  }
+
   private renderExecutionSubmit() {
     const { executionSubmitDialogOpen } = this.state;
     if (!executionSubmitDialogOpen) return;
@@ -844,6 +871,13 @@ export class WorkbenchView extends 
React.PureComponent<WorkbenchViewProps, Workb
                       onClick={this.openSpecDialog}
                     />
                   )}
+                  {!hiddenMoreMenuItems.includes('convert-supervisor-to-sql') 
&& (
+                    <MenuItem
+                      icon={IconNames.EXCHANGE}
+                      text="Convert supervisor to SQL"
+                      onClick={this.openSupervisorToSqlDialog}
+                    />
+                  )}
                   {!hiddenMoreMenuItems.includes('attach-tab-from-task-id') && 
(
                     <MenuItem
                       icon={IconNames.DOCUMENT_OPEN}
@@ -1008,6 +1042,7 @@ export class WorkbenchView extends 
React.PureComponent<WorkbenchViewProps, Workb
         {this.renderConnectExternalDataDialog()}
         {this.renderTabRenameDialog()}
         {this.renderSpecDialog()}
+        {this.renderSupervisorToSqlDialog()}
         {this.renderExecutionSubmit()}
         {this.renderTaskIdSubmit()}
         <MetadataChangeDetector onChange={() => 
this.metadataQueryManager?.runQuery(null)} />


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to