This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0cc998d improve spec upgrading (#12072)
0cc998d is described below
commit 0cc998d8a1dad781cf509a739229cc07fbf74641
Author: Vadim Ogievetsky <[email protected]>
AuthorDate: Wed Dec 15 10:28:21 2021 -0800
improve spec upgrading (#12072)
---
.../__snapshots__/ingestion-spec.spec.ts.snap | 77 ------
.../src/druid-models/ingestion-spec.spec.ts | 286 +++++++++++++++++----
web-console/src/druid-models/ingestion-spec.tsx | 64 ++---
3 files changed, 256 insertions(+), 171 deletions(-)
diff --git
a/web-console/src/druid-models/__snapshots__/ingestion-spec.spec.ts.snap
b/web-console/src/druid-models/__snapshots__/ingestion-spec.spec.ts.snap
deleted file mode 100644
index be83632..0000000
--- a/web-console/src/druid-models/__snapshots__/ingestion-spec.spec.ts.snap
+++ /dev/null
@@ -1,77 +0,0 @@
-// Jest Snapshot v1, https://goo.gl/fbAQLP
-
-exports[`ingestion-spec upgrades 1`] = `
-Object {
- "spec": Object {
- "dataSchema": Object {
- "dataSource": "wikipedia",
- "dimensionsSpec": Object {
- "dimensions": Array [
- "channel",
- "cityName",
- "comment",
- ],
- },
- "granularitySpec": Object {
- "queryGranularity": "hour",
- "rollup": true,
- "segmentGranularity": "day",
- },
- "metricsSpec": Array [
- Object {
- "name": "count",
- "type": "count",
- },
- Object {
- "fieldName": "added",
- "name": "sum_added",
- "type": "longSum",
- },
- ],
- "timestampSpec": Object {
- "column": "timestamp",
- "format": "iso",
- },
- "transformSpec": Object {
- "filter": Object {
- "dimension": "commentLength",
- "type": "selector",
- "value": "35",
- },
- "transforms": Array [
- Object {
- "expression": "concat(\\"channel\\", 'lol')",
- "name": "channel",
- "type": "expression",
- },
- ],
- },
- },
- "ioConfig": Object {
- "inputFormat": Object {
- "flattenSpec": Object {
- "fields": Array [
- Object {
- "expr": "$.cityName",
- "name": "cityNameAlt",
- "type": "path",
- },
- ],
- },
- "type": "json",
- },
- "inputSource": Object {
- "type": "http",
- "uris": Array [
- "https://static.imply.io/data/wikipedia.json.gz",
- ],
- },
- "type": "index_parallel",
- },
- "tuningConfig": Object {
- "type": "index_parallel",
- },
- },
- "type": "index_parallel",
-}
-`;
diff --git a/web-console/src/druid-models/ingestion-spec.spec.ts
b/web-console/src/druid-models/ingestion-spec.spec.ts
index 8a7b6bb..b76a6f9 100644
--- a/web-console/src/druid-models/ingestion-spec.spec.ts
+++ b/web-console/src/druid-models/ingestion-spec.spec.ts
@@ -19,7 +19,6 @@
import {
adjustId,
cleanSpec,
- downgradeSpec,
getColumnTypeFromHeaderAndRows,
guessInputFormat,
guessTypeFromSample,
@@ -29,60 +28,164 @@ import {
} from './ingestion-spec';
describe('ingestion-spec', () => {
- const oldSpec = {
- type: 'index_parallel',
- spec: {
- ioConfig: {
- type: 'index_parallel',
- firehose: {
- type: 'http',
- uris: ['https://static.imply.io/data/wikipedia.json.gz'],
+ it('upgrades / downgrades task spec', () => {
+ const oldTaskSpec = {
+ type: 'index_parallel',
+ spec: {
+ ioConfig: {
+ type: 'index_parallel',
+ firehose: {
+ type: 'http',
+ uris: ['https://static.imply.io/data/wikipedia.json.gz'],
+ },
},
- },
- tuningConfig: {
- type: 'index_parallel',
- },
- dataSchema: {
- dataSource: 'wikipedia',
- granularitySpec: {
- segmentGranularity: 'day',
- queryGranularity: 'hour',
- rollup: true,
+ tuningConfig: {
+ type: 'index_parallel',
},
- parser: {
- type: 'string',
- parseSpec: {
- format: 'json',
- timestampSpec: {
- column: 'timestamp',
- format: 'iso',
+ dataSchema: {
+ dataSource: 'wikipedia',
+ granularitySpec: {
+ segmentGranularity: 'day',
+ queryGranularity: 'hour',
+ rollup: true,
+ },
+ parser: {
+ type: 'string',
+ parseSpec: {
+ format: 'json',
+ timestampSpec: {
+ column: 'timestamp',
+ format: 'iso',
+ },
+ dimensionsSpec: {
+ dimensions: ['channel', 'cityName', 'comment'],
+ },
+ flattenSpec: {
+ fields: [
+ {
+ type: 'path',
+ name: 'cityNameAlt',
+ expr: '$.cityName',
+ },
+ ],
+ },
},
- dimensionsSpec: {
- dimensions: ['channel', 'cityName', 'comment'],
+ },
+ transformSpec: {
+ transforms: [
+ {
+ type: 'expression',
+ name: 'channel',
+ expression: 'concat("channel", \'lol\')',
+ },
+ ],
+ filter: {
+ type: 'selector',
+ dimension: 'commentLength',
+ value: '35',
+ },
+ },
+ metricsSpec: [
+ {
+ name: 'count',
+ type: 'count',
+ },
+ {
+ name: 'sum_added',
+ type: 'longSum',
+ fieldName: 'added',
+ },
+ ],
+ },
+ },
+ };
+
+ expect(upgradeSpec(oldTaskSpec)).toEqual({
+ spec: {
+ dataSchema: {
+ dataSource: 'wikipedia',
+ dimensionsSpec: {
+ dimensions: ['channel', 'cityName', 'comment'],
+ },
+ granularitySpec: {
+ queryGranularity: 'hour',
+ rollup: true,
+ segmentGranularity: 'day',
+ },
+ metricsSpec: [
+ {
+ name: 'count',
+ type: 'count',
+ },
+ {
+ fieldName: 'added',
+ name: 'sum_added',
+ type: 'longSum',
+ },
+ ],
+ timestampSpec: {
+ column: 'timestamp',
+ format: 'iso',
+ },
+ transformSpec: {
+ filter: {
+ dimension: 'commentLength',
+ type: 'selector',
+ value: '35',
},
+ transforms: [
+ {
+ expression: 'concat("channel", \'lol\')',
+ name: 'channel',
+ type: 'expression',
+ },
+ ],
+ },
+ },
+ ioConfig: {
+ inputFormat: {
flattenSpec: {
fields: [
{
- type: 'path',
- name: 'cityNameAlt',
expr: '$.cityName',
+ name: 'cityNameAlt',
+ type: 'path',
},
],
},
+ type: 'json',
+ },
+ inputSource: {
+ type: 'http',
+ uris: ['https://static.imply.io/data/wikipedia.json.gz'],
},
+ type: 'index_parallel',
},
- transformSpec: {
- transforms: [
- {
- type: 'expression',
- name: 'channel',
- expression: 'concat("channel", \'lol\')',
+ tuningConfig: {
+ type: 'index_parallel',
+ },
+ },
+ type: 'index_parallel',
+ });
+ });
+
+ it('upgrades / downgrades supervisor spec', () => {
+ const oldSupervisorSpec = {
+ type: 'kafka',
+ dataSchema: {
+ dataSource: 'metrics-kafka',
+ parser: {
+ type: 'string',
+ parseSpec: {
+ format: 'json',
+ timestampSpec: {
+ column: 'timestamp',
+ format: 'auto',
+ },
+ dimensionsSpec: {
+ dimensions: [],
+ dimensionExclusions: ['timestamp', 'value'],
},
- ],
- filter: {
- type: 'selector',
- dimension: 'commentLength',
- value: '35',
},
},
metricsSpec: [
@@ -91,21 +194,100 @@ describe('ingestion-spec', () => {
type: 'count',
},
{
- name: 'sum_added',
- type: 'longSum',
- fieldName: 'added',
+ name: 'value_sum',
+ fieldName: 'value',
+ type: 'doubleSum',
+ },
+ {
+ name: 'value_min',
+ fieldName: 'value',
+ type: 'doubleMin',
+ },
+ {
+ name: 'value_max',
+ fieldName: 'value',
+ type: 'doubleMax',
},
],
+ granularitySpec: {
+ type: 'uniform',
+ segmentGranularity: 'HOUR',
+ queryGranularity: 'NONE',
+ },
},
- },
- };
-
- it('upgrades', () => {
- expect(upgradeSpec(oldSpec)).toMatchSnapshot();
- });
+ tuningConfig: {
+ type: 'kafka',
+ maxRowsPerSegment: 5000000,
+ },
+ ioConfig: {
+ topic: 'metrics',
+ consumerProperties: {
+ 'bootstrap.servers': 'localhost:9092',
+ },
+ taskCount: 1,
+ replicas: 1,
+ taskDuration: 'PT1H',
+ },
+ };
- it('round trips', () => {
- expect(downgradeSpec(upgradeSpec(oldSpec))).toMatchObject(oldSpec);
+ expect(upgradeSpec(oldSupervisorSpec)).toEqual({
+ spec: {
+ dataSchema: {
+ dataSource: 'metrics-kafka',
+ dimensionsSpec: {
+ dimensionExclusions: ['timestamp', 'value'],
+ dimensions: [],
+ },
+ granularitySpec: {
+ queryGranularity: 'NONE',
+ segmentGranularity: 'HOUR',
+ type: 'uniform',
+ },
+ metricsSpec: [
+ {
+ name: 'count',
+ type: 'count',
+ },
+ {
+ fieldName: 'value',
+ name: 'value_sum',
+ type: 'doubleSum',
+ },
+ {
+ fieldName: 'value',
+ name: 'value_min',
+ type: 'doubleMin',
+ },
+ {
+ fieldName: 'value',
+ name: 'value_max',
+ type: 'doubleMax',
+ },
+ ],
+ timestampSpec: {
+ column: 'timestamp',
+ format: 'auto',
+ },
+ },
+ ioConfig: {
+ consumerProperties: {
+ 'bootstrap.servers': 'localhost:9092',
+ },
+ inputFormat: {
+ type: 'json',
+ },
+ replicas: 1,
+ taskCount: 1,
+ taskDuration: 'PT1H',
+ topic: 'metrics',
+ },
+ tuningConfig: {
+ maxRowsPerSegment: 5000000,
+ type: 'kafka',
+ },
+ },
+ type: 'kafka',
+ });
});
it('cleanSpec', () => {
diff --git a/web-console/src/druid-models/ingestion-spec.tsx
b/web-console/src/druid-models/ingestion-spec.tsx
index e800fb9..1e5b14d 100644
--- a/web-console/src/druid-models/ingestion-spec.tsx
+++ b/web-console/src/druid-models/ingestion-spec.tsx
@@ -2249,6 +2249,15 @@ export function updateSchemaWithSample(
// ------------------------
export function upgradeSpec(spec: any): Partial<IngestionSpec> {
+ if (deepGet(spec, 'type') && deepGet(spec, 'dataSchema')) {
+ spec = {
+ type: spec.type,
+ spec: deepDelete(spec, 'type'),
+ };
+ }
+
+ if (!deepGet(spec, 'spec.dataSchema.parser')) return spec;
+
if (deepGet(spec, 'spec.ioConfig.firehose')) {
switch (deepGet(spec, 'spec.ioConfig.firehose.type')) {
case 'static-s3':
@@ -2262,51 +2271,22 @@ export function upgradeSpec(spec: any):
Partial<IngestionSpec> {
}
spec = deepMove(spec, 'spec.ioConfig.firehose',
'spec.ioConfig.inputSource');
- spec = deepMove(
- spec,
- 'spec.dataSchema.parser.parseSpec.timestampSpec',
- 'spec.dataSchema.timestampSpec',
- );
- spec = deepMove(
- spec,
- 'spec.dataSchema.parser.parseSpec.dimensionsSpec',
- 'spec.dataSchema.dimensionsSpec',
- );
- spec = deepMove(spec, 'spec.dataSchema.parser.parseSpec',
'spec.ioConfig.inputFormat');
- spec = deepDelete(spec, 'spec.dataSchema.parser');
- spec = deepMove(spec, 'spec.ioConfig.inputFormat.format',
'spec.ioConfig.inputFormat.type');
}
- return spec;
-}
-export function downgradeSpec(spec: Partial<IngestionSpec>):
Partial<IngestionSpec> {
- if (deepGet(spec, 'spec.ioConfig.inputSource')) {
- spec = deepMove(spec, 'spec.ioConfig.inputFormat.type',
'spec.ioConfig.inputFormat.format');
- spec = deepSet(spec, 'spec.dataSchema.parser', { type: 'string' });
- spec = deepMove(spec, 'spec.ioConfig.inputFormat',
'spec.dataSchema.parser.parseSpec');
- spec = deepMove(
- spec,
- 'spec.dataSchema.dimensionsSpec',
- 'spec.dataSchema.parser.parseSpec.dimensionsSpec',
- );
- spec = deepMove(
- spec,
- 'spec.dataSchema.timestampSpec',
- 'spec.dataSchema.parser.parseSpec.timestampSpec',
- );
- spec = deepMove(spec, 'spec.ioConfig.inputSource',
'spec.ioConfig.firehose');
-
- switch (deepGet(spec, 'spec.ioConfig.firehose.type')) {
- case 's3':
- deepSet(spec, 'spec.ioConfig.firehose.type', 'static-s3');
- break;
+ spec = deepMove(
+ spec,
+ 'spec.dataSchema.parser.parseSpec.timestampSpec',
+ 'spec.dataSchema.timestampSpec',
+ );
+ spec = deepMove(
+ spec,
+ 'spec.dataSchema.parser.parseSpec.dimensionsSpec',
+ 'spec.dataSchema.dimensionsSpec',
+ );
+ spec = deepMove(spec, 'spec.dataSchema.parser.parseSpec',
'spec.ioConfig.inputFormat');
+ spec = deepDelete(spec, 'spec.dataSchema.parser');
+ spec = deepMove(spec, 'spec.ioConfig.inputFormat.format',
'spec.ioConfig.inputFormat.type');
- case 'google':
- deepSet(spec, 'spec.ioConfig.firehose.type',
'static-google-blobstore');
- deepMove(spec, 'spec.ioConfig.firehose.objects',
'spec.ioConfig.firehose.blobs');
- break;
- }
- }
return spec;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]