This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cc998d  improve spec upgrading (#12072)
0cc998d is described below

commit 0cc998d8a1dad781cf509a739229cc07fbf74641
Author: Vadim Ogievetsky <[email protected]>
AuthorDate: Wed Dec 15 10:28:21 2021 -0800

    improve spec upgrading (#12072)
---
 .../__snapshots__/ingestion-spec.spec.ts.snap      |  77 ------
 .../src/druid-models/ingestion-spec.spec.ts        | 286 +++++++++++++++++----
 web-console/src/druid-models/ingestion-spec.tsx    |  64 ++---
 3 files changed, 256 insertions(+), 171 deletions(-)

diff --git 
a/web-console/src/druid-models/__snapshots__/ingestion-spec.spec.ts.snap 
b/web-console/src/druid-models/__snapshots__/ingestion-spec.spec.ts.snap
deleted file mode 100644
index be83632..0000000
--- a/web-console/src/druid-models/__snapshots__/ingestion-spec.spec.ts.snap
+++ /dev/null
@@ -1,77 +0,0 @@
-// Jest Snapshot v1, https://goo.gl/fbAQLP
-
-exports[`ingestion-spec upgrades 1`] = `
-Object {
-  "spec": Object {
-    "dataSchema": Object {
-      "dataSource": "wikipedia",
-      "dimensionsSpec": Object {
-        "dimensions": Array [
-          "channel",
-          "cityName",
-          "comment",
-        ],
-      },
-      "granularitySpec": Object {
-        "queryGranularity": "hour",
-        "rollup": true,
-        "segmentGranularity": "day",
-      },
-      "metricsSpec": Array [
-        Object {
-          "name": "count",
-          "type": "count",
-        },
-        Object {
-          "fieldName": "added",
-          "name": "sum_added",
-          "type": "longSum",
-        },
-      ],
-      "timestampSpec": Object {
-        "column": "timestamp",
-        "format": "iso",
-      },
-      "transformSpec": Object {
-        "filter": Object {
-          "dimension": "commentLength",
-          "type": "selector",
-          "value": "35",
-        },
-        "transforms": Array [
-          Object {
-            "expression": "concat(\\"channel\\", 'lol')",
-            "name": "channel",
-            "type": "expression",
-          },
-        ],
-      },
-    },
-    "ioConfig": Object {
-      "inputFormat": Object {
-        "flattenSpec": Object {
-          "fields": Array [
-            Object {
-              "expr": "$.cityName",
-              "name": "cityNameAlt",
-              "type": "path",
-            },
-          ],
-        },
-        "type": "json",
-      },
-      "inputSource": Object {
-        "type": "http",
-        "uris": Array [
-          "https://static.imply.io/data/wikipedia.json.gz";,
-        ],
-      },
-      "type": "index_parallel",
-    },
-    "tuningConfig": Object {
-      "type": "index_parallel",
-    },
-  },
-  "type": "index_parallel",
-}
-`;
diff --git a/web-console/src/druid-models/ingestion-spec.spec.ts 
b/web-console/src/druid-models/ingestion-spec.spec.ts
index 8a7b6bb..b76a6f9 100644
--- a/web-console/src/druid-models/ingestion-spec.spec.ts
+++ b/web-console/src/druid-models/ingestion-spec.spec.ts
@@ -19,7 +19,6 @@
 import {
   adjustId,
   cleanSpec,
-  downgradeSpec,
   getColumnTypeFromHeaderAndRows,
   guessInputFormat,
   guessTypeFromSample,
@@ -29,60 +28,164 @@ import {
 } from './ingestion-spec';
 
 describe('ingestion-spec', () => {
-  const oldSpec = {
-    type: 'index_parallel',
-    spec: {
-      ioConfig: {
-        type: 'index_parallel',
-        firehose: {
-          type: 'http',
-          uris: ['https://static.imply.io/data/wikipedia.json.gz'],
+  it('upgrades / downgrades task spec', () => {
+    const oldTaskSpec = {
+      type: 'index_parallel',
+      spec: {
+        ioConfig: {
+          type: 'index_parallel',
+          firehose: {
+            type: 'http',
+            uris: ['https://static.imply.io/data/wikipedia.json.gz'],
+          },
         },
-      },
-      tuningConfig: {
-        type: 'index_parallel',
-      },
-      dataSchema: {
-        dataSource: 'wikipedia',
-        granularitySpec: {
-          segmentGranularity: 'day',
-          queryGranularity: 'hour',
-          rollup: true,
+        tuningConfig: {
+          type: 'index_parallel',
         },
-        parser: {
-          type: 'string',
-          parseSpec: {
-            format: 'json',
-            timestampSpec: {
-              column: 'timestamp',
-              format: 'iso',
+        dataSchema: {
+          dataSource: 'wikipedia',
+          granularitySpec: {
+            segmentGranularity: 'day',
+            queryGranularity: 'hour',
+            rollup: true,
+          },
+          parser: {
+            type: 'string',
+            parseSpec: {
+              format: 'json',
+              timestampSpec: {
+                column: 'timestamp',
+                format: 'iso',
+              },
+              dimensionsSpec: {
+                dimensions: ['channel', 'cityName', 'comment'],
+              },
+              flattenSpec: {
+                fields: [
+                  {
+                    type: 'path',
+                    name: 'cityNameAlt',
+                    expr: '$.cityName',
+                  },
+                ],
+              },
             },
-            dimensionsSpec: {
-              dimensions: ['channel', 'cityName', 'comment'],
+          },
+          transformSpec: {
+            transforms: [
+              {
+                type: 'expression',
+                name: 'channel',
+                expression: 'concat("channel", \'lol\')',
+              },
+            ],
+            filter: {
+              type: 'selector',
+              dimension: 'commentLength',
+              value: '35',
+            },
+          },
+          metricsSpec: [
+            {
+              name: 'count',
+              type: 'count',
+            },
+            {
+              name: 'sum_added',
+              type: 'longSum',
+              fieldName: 'added',
+            },
+          ],
+        },
+      },
+    };
+
+    expect(upgradeSpec(oldTaskSpec)).toEqual({
+      spec: {
+        dataSchema: {
+          dataSource: 'wikipedia',
+          dimensionsSpec: {
+            dimensions: ['channel', 'cityName', 'comment'],
+          },
+          granularitySpec: {
+            queryGranularity: 'hour',
+            rollup: true,
+            segmentGranularity: 'day',
+          },
+          metricsSpec: [
+            {
+              name: 'count',
+              type: 'count',
+            },
+            {
+              fieldName: 'added',
+              name: 'sum_added',
+              type: 'longSum',
+            },
+          ],
+          timestampSpec: {
+            column: 'timestamp',
+            format: 'iso',
+          },
+          transformSpec: {
+            filter: {
+              dimension: 'commentLength',
+              type: 'selector',
+              value: '35',
             },
+            transforms: [
+              {
+                expression: 'concat("channel", \'lol\')',
+                name: 'channel',
+                type: 'expression',
+              },
+            ],
+          },
+        },
+        ioConfig: {
+          inputFormat: {
             flattenSpec: {
               fields: [
                 {
-                  type: 'path',
-                  name: 'cityNameAlt',
                   expr: '$.cityName',
+                  name: 'cityNameAlt',
+                  type: 'path',
                 },
               ],
             },
+            type: 'json',
+          },
+          inputSource: {
+            type: 'http',
+            uris: ['https://static.imply.io/data/wikipedia.json.gz'],
           },
+          type: 'index_parallel',
         },
-        transformSpec: {
-          transforms: [
-            {
-              type: 'expression',
-              name: 'channel',
-              expression: 'concat("channel", \'lol\')',
+        tuningConfig: {
+          type: 'index_parallel',
+        },
+      },
+      type: 'index_parallel',
+    });
+  });
+
+  it('upgrades / downgrades supervisor spec', () => {
+    const oldSupervisorSpec = {
+      type: 'kafka',
+      dataSchema: {
+        dataSource: 'metrics-kafka',
+        parser: {
+          type: 'string',
+          parseSpec: {
+            format: 'json',
+            timestampSpec: {
+              column: 'timestamp',
+              format: 'auto',
+            },
+            dimensionsSpec: {
+              dimensions: [],
+              dimensionExclusions: ['timestamp', 'value'],
             },
-          ],
-          filter: {
-            type: 'selector',
-            dimension: 'commentLength',
-            value: '35',
           },
         },
         metricsSpec: [
@@ -91,21 +194,100 @@ describe('ingestion-spec', () => {
             type: 'count',
           },
           {
-            name: 'sum_added',
-            type: 'longSum',
-            fieldName: 'added',
+            name: 'value_sum',
+            fieldName: 'value',
+            type: 'doubleSum',
+          },
+          {
+            name: 'value_min',
+            fieldName: 'value',
+            type: 'doubleMin',
+          },
+          {
+            name: 'value_max',
+            fieldName: 'value',
+            type: 'doubleMax',
           },
         ],
+        granularitySpec: {
+          type: 'uniform',
+          segmentGranularity: 'HOUR',
+          queryGranularity: 'NONE',
+        },
       },
-    },
-  };
-
-  it('upgrades', () => {
-    expect(upgradeSpec(oldSpec)).toMatchSnapshot();
-  });
+      tuningConfig: {
+        type: 'kafka',
+        maxRowsPerSegment: 5000000,
+      },
+      ioConfig: {
+        topic: 'metrics',
+        consumerProperties: {
+          'bootstrap.servers': 'localhost:9092',
+        },
+        taskCount: 1,
+        replicas: 1,
+        taskDuration: 'PT1H',
+      },
+    };
 
-  it('round trips', () => {
-    expect(downgradeSpec(upgradeSpec(oldSpec))).toMatchObject(oldSpec);
+    expect(upgradeSpec(oldSupervisorSpec)).toEqual({
+      spec: {
+        dataSchema: {
+          dataSource: 'metrics-kafka',
+          dimensionsSpec: {
+            dimensionExclusions: ['timestamp', 'value'],
+            dimensions: [],
+          },
+          granularitySpec: {
+            queryGranularity: 'NONE',
+            segmentGranularity: 'HOUR',
+            type: 'uniform',
+          },
+          metricsSpec: [
+            {
+              name: 'count',
+              type: 'count',
+            },
+            {
+              fieldName: 'value',
+              name: 'value_sum',
+              type: 'doubleSum',
+            },
+            {
+              fieldName: 'value',
+              name: 'value_min',
+              type: 'doubleMin',
+            },
+            {
+              fieldName: 'value',
+              name: 'value_max',
+              type: 'doubleMax',
+            },
+          ],
+          timestampSpec: {
+            column: 'timestamp',
+            format: 'auto',
+          },
+        },
+        ioConfig: {
+          consumerProperties: {
+            'bootstrap.servers': 'localhost:9092',
+          },
+          inputFormat: {
+            type: 'json',
+          },
+          replicas: 1,
+          taskCount: 1,
+          taskDuration: 'PT1H',
+          topic: 'metrics',
+        },
+        tuningConfig: {
+          maxRowsPerSegment: 5000000,
+          type: 'kafka',
+        },
+      },
+      type: 'kafka',
+    });
   });
 
   it('cleanSpec', () => {
diff --git a/web-console/src/druid-models/ingestion-spec.tsx 
b/web-console/src/druid-models/ingestion-spec.tsx
index e800fb9..1e5b14d 100644
--- a/web-console/src/druid-models/ingestion-spec.tsx
+++ b/web-console/src/druid-models/ingestion-spec.tsx
@@ -2249,6 +2249,15 @@ export function updateSchemaWithSample(
 // ------------------------
 
 export function upgradeSpec(spec: any): Partial<IngestionSpec> {
+  if (deepGet(spec, 'type') && deepGet(spec, 'dataSchema')) {
+    spec = {
+      type: spec.type,
+      spec: deepDelete(spec, 'type'),
+    };
+  }
+
+  if (!deepGet(spec, 'spec.dataSchema.parser')) return spec;
+
   if (deepGet(spec, 'spec.ioConfig.firehose')) {
     switch (deepGet(spec, 'spec.ioConfig.firehose.type')) {
       case 'static-s3':
@@ -2262,51 +2271,22 @@ export function upgradeSpec(spec: any): 
Partial<IngestionSpec> {
     }
 
     spec = deepMove(spec, 'spec.ioConfig.firehose', 
'spec.ioConfig.inputSource');
-    spec = deepMove(
-      spec,
-      'spec.dataSchema.parser.parseSpec.timestampSpec',
-      'spec.dataSchema.timestampSpec',
-    );
-    spec = deepMove(
-      spec,
-      'spec.dataSchema.parser.parseSpec.dimensionsSpec',
-      'spec.dataSchema.dimensionsSpec',
-    );
-    spec = deepMove(spec, 'spec.dataSchema.parser.parseSpec', 
'spec.ioConfig.inputFormat');
-    spec = deepDelete(spec, 'spec.dataSchema.parser');
-    spec = deepMove(spec, 'spec.ioConfig.inputFormat.format', 
'spec.ioConfig.inputFormat.type');
   }
-  return spec;
-}
 
-export function downgradeSpec(spec: Partial<IngestionSpec>): 
Partial<IngestionSpec> {
-  if (deepGet(spec, 'spec.ioConfig.inputSource')) {
-    spec = deepMove(spec, 'spec.ioConfig.inputFormat.type', 
'spec.ioConfig.inputFormat.format');
-    spec = deepSet(spec, 'spec.dataSchema.parser', { type: 'string' });
-    spec = deepMove(spec, 'spec.ioConfig.inputFormat', 
'spec.dataSchema.parser.parseSpec');
-    spec = deepMove(
-      spec,
-      'spec.dataSchema.dimensionsSpec',
-      'spec.dataSchema.parser.parseSpec.dimensionsSpec',
-    );
-    spec = deepMove(
-      spec,
-      'spec.dataSchema.timestampSpec',
-      'spec.dataSchema.parser.parseSpec.timestampSpec',
-    );
-    spec = deepMove(spec, 'spec.ioConfig.inputSource', 
'spec.ioConfig.firehose');
-
-    switch (deepGet(spec, 'spec.ioConfig.firehose.type')) {
-      case 's3':
-        deepSet(spec, 'spec.ioConfig.firehose.type', 'static-s3');
-        break;
+  spec = deepMove(
+    spec,
+    'spec.dataSchema.parser.parseSpec.timestampSpec',
+    'spec.dataSchema.timestampSpec',
+  );
+  spec = deepMove(
+    spec,
+    'spec.dataSchema.parser.parseSpec.dimensionsSpec',
+    'spec.dataSchema.dimensionsSpec',
+  );
+  spec = deepMove(spec, 'spec.dataSchema.parser.parseSpec', 
'spec.ioConfig.inputFormat');
+  spec = deepDelete(spec, 'spec.dataSchema.parser');
+  spec = deepMove(spec, 'spec.ioConfig.inputFormat.format', 
'spec.ioConfig.inputFormat.type');
 
-      case 'google':
-        deepSet(spec, 'spec.ioConfig.firehose.type', 
'static-google-blobstore');
-        deepMove(spec, 'spec.ioConfig.firehose.objects', 
'spec.ioConfig.firehose.blobs');
-        break;
-    }
-  }
   return spec;
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to