This is an automated email from the ASF dual-hosted git repository.
vogievetsky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8d125b7c7f Web console: segment writing progress indication (#13929)
8d125b7c7f is described below
commit 8d125b7c7ffe74c32c0e1495aca84b7318285d98
Author: Vadim Ogievetsky <[email protected]>
AuthorDate: Wed Mar 22 16:34:38 2023 -0700
Web console: segment writing progress indication (#13929)
* add segment writing progress indication
* update with more metrics
* add push metric
---
web-console/src/druid-models/stages/stages.ts | 55 ++++++++++++---
.../execution-details-pane.tsx | 4 +-
.../execution-stages-pane.tsx | 81 ++++++++++++++++++++--
.../ingest-success-pane.spec.tsx.snap | 2 +-
.../ingest-success-pane/ingest-success-pane.tsx | 2 +-
5 files changed, 125 insertions(+), 19 deletions(-)
diff --git a/web-console/src/druid-models/stages/stages.ts
b/web-console/src/druid-models/stages/stages.ts
index 7b3f719dbe..7383840cf7 100644
--- a/web-console/src/druid-models/stages/stages.ts
+++ b/web-console/src/druid-models/stages/stages.ts
@@ -102,6 +102,7 @@ export interface StageWorkerCounter {
output?: ChannelCounter;
shuffle?: ChannelCounter;
sortProgress?: SortProgressCounter;
+ segmentGenerationProgress?: SegmentGenerationProgressCounter;
warnings?: WarningCounter;
}
@@ -146,6 +147,20 @@ export interface SortProgressCounter {
triviallyComplete?: boolean;
}
+export interface SegmentGenerationProgressCounter {
+ type: 'segmentGenerationProgress';
+ rowsProcessed: number;
+ rowsPersisted: number;
+ rowsMerged: number;
+ rowsPushed: number;
+}
+
+export type SegmentGenerationProgressFields =
+ | 'rowsProcessed'
+ | 'rowsPersisted'
+ | 'rowsMerged'
+ | 'rowsPushed';
+
export interface WarningCounter {
type: 'warning';
CannotParseExternalData?: number;
@@ -157,6 +172,7 @@ export interface SimpleWideCounter {
[k: `input${number}`]: Record<ChannelFields, number> | undefined;
output?: Record<ChannelFields, number>;
shuffle?: Record<ChannelFields, number>;
+ segmentGenerationProgress?: SegmentGenerationProgressCounter;
}
function zeroChannelFields(): Record<ChannelFields, number> {
@@ -173,8 +189,12 @@ export class Stages {
static readonly QUERY_START_FACTOR = 0.05;
static readonly QUERY_END_FACTOR = 0.05;
+ static stageType(stage: StageDefinition): string {
+ return stage.definition.processor.type;
+ }
+
static stageWeight(stage: StageDefinition): number {
- return stage.definition.processor.type === 'limit' ? 0.1 : 1;
+ return Stages.stageType(stage) === 'limit' ? 0.1 : 1;
}
public readonly stages: StageDefinition[];
@@ -214,6 +234,9 @@ export class Stages {
case 'shuffle':
return 'Shuffle output';
+ case 'segmentGenerationProgress':
+ return 'Segment generation';
+
default:
if (counterName.startsWith('input')) {
const inputIndex = Number(counterName.replace('input', ''));
@@ -230,7 +253,7 @@ export class Stages {
}
stageHasOutput(stage: StageDefinition): boolean {
- return stage.definition.processor.type !== 'segmentGenerator';
+ return Stages.stageType(stage) !== 'segmentGenerator';
}
stageHasSort(stage: StageDefinition): boolean {
@@ -287,13 +310,16 @@ export class Stages {
) / inputFileCount
);
} else {
- // Otherwise, base it on the stage input divided by the output of all
non-broadcast input stages
+ // Otherwise, base it on the stage input divided by the output of all
non-broadcast input stages,
+ // use the segment generation counter in the special case of a
segmentGenerator stage
return zeroDivide(
- sum(input, (inputSource, i) =>
- inputSource.type === 'stage' && !broadcast?.includes(i)
- ? this.getTotalCounterForStage(stage, `input${i}`, 'rows')
- : 0,
- ),
+ Stages.stageType(stage) === 'segmentGenerator'
+ ? this.getTotalSegmentGenerationProgressForStage(stage, 'rowsPushed')
+ : sum(input, (inputSource, i) =>
+ inputSource.type === 'stage' && !broadcast?.includes(i)
+ ? this.getTotalCounterForStage(stage, `input${i}`, 'rows')
+ : 0,
+ ),
sum(input, (inputSource, i) =>
inputSource.type === 'stage' && !broadcast?.includes(i)
? this.getTotalOutputForStage(stages[inputSource.stage], 'rows')
@@ -400,6 +426,15 @@ export class Stages {
);
}
+ getTotalSegmentGenerationProgressForStage(
+ stage: StageDefinition,
+ field: SegmentGenerationProgressFields,
+ ): number {
+ const { counters } = this;
+ if (!counters) return 0;
+ return sum(this.getCountersForStage(stage), c =>
c.segmentGenerationProgress?.[field] || 0);
+ }
+
getChannelCounterNamesForStage(stage: StageDefinition): ChannelCounterName[]
{
const { definition } = stage;
@@ -416,8 +451,7 @@ export class Stages {
const channelCounters = this.getChannelCounterNamesForStage(stage);
const forStageCounters = counters?.[stageNumber] || {};
- return Object.keys(forStageCounters).map(key => {
- const stageCounters = forStageCounters[key];
+ return Object.entries(forStageCounters).map(([key, stageCounters]) => {
const newWideCounter: SimpleWideCounter = {
index: Number(key),
};
@@ -433,6 +467,7 @@ export class Stages {
}
: zeroChannelFields();
}
+ newWideCounter.segmentGenerationProgress =
stageCounters.segmentGenerationProgress;
return newWideCounter;
});
}
diff --git
a/web-console/src/views/workbench-view/execution-details-pane/execution-details-pane.tsx
b/web-console/src/views/workbench-view/execution-details-pane/execution-details-pane.tsx
index a0729ec0fe..da6893d208 100644
---
a/web-console/src/views/workbench-view/execution-details-pane/execution-details-pane.tsx
+++
b/web-console/src/views/workbench-view/execution-details-pane/execution-details-pane.tsx
@@ -16,7 +16,7 @@
* limitations under the License.
*/
import { IconNames } from '@blueprintjs/icons';
-import { RefName } from 'druid-query-toolkit';
+import { T } from 'druid-query-toolkit';
import * as JSONBig from 'json-bigint-native';
import React, { useState } from 'react';
@@ -51,7 +51,7 @@ export const ExecutionDetailsPane = React.memo(function
ExecutionDetailsPane(
return (
<div>
<p>{`General info for ${execution.id}${
- ingestDatasource ? ` ingesting into
${RefName.create(ingestDatasource, true)}` : ''
+ ingestDatasource ? ` ingesting into ${T(ingestDatasource)}` : ''
}`}</p>
{execution.error && <ExecutionErrorPane execution={execution} />}
{execution.stages ? (
diff --git
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
index 9464de13ea..250ddaea8a 100644
---
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
+++
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
@@ -31,6 +31,7 @@ import type {
ClusterBy,
CounterName,
Execution,
+ SegmentGenerationProgressFields,
SimpleWideCounter,
StageDefinition,
} from '../../../druid-models';
@@ -125,6 +126,8 @@ export const ExecutionStagesPane = React.memo(function
ExecutionStagesPane(
...stages.getInputCountersForStage(stage, 'rows').map(formatRows),
formatRows(stages.getTotalCounterForStage(stage, 'output', 'rows')),
formatRows(stages.getTotalCounterForStage(stage, 'shuffle', 'rows')),
+ formatRows(stages.getTotalSegmentGenerationProgressForStage(stage,
'rowsMerged')),
+ formatRows(stages.getTotalSegmentGenerationProgressForStage(stage,
'rowsPushed')),
]);
const filesValues = filterMap(stages.stages, stage => {
@@ -164,6 +167,18 @@ export const ExecutionStagesPane = React.memo(function
ExecutionStagesPane(
});
}
+ const isSegmentGenerator = Stages.stageType(stage) === 'segmentGenerator';
+ let bracesSegmentRowsMerged: string[] = [];
+ let bracesSegmentRowsPushed: string[] = [];
+ if (isSegmentGenerator) {
+ bracesSegmentRowsMerged = wideCounters.map(wideCounter =>
+ formatRows(wideCounter.segmentGenerationProgress?.rowsMerged || 0),
+ );
+ bracesSegmentRowsPushed = wideCounters.map(wideCounter =>
+ formatRows(wideCounter.segmentGenerationProgress?.rowsPushed || 0),
+ );
+ }
+
return (
<ReactTable
className="detail-counters-for-workers"
@@ -236,6 +251,30 @@ export const ExecutionStagesPane = React.memo(function
ExecutionStagesPane(
},
};
}),
+ Stages.stageType(stage) === 'segmentGenerator'
+ ? [
+ {
+ Header: twoLines('Merged', <i>rows</i>),
+ id: 'segmentGeneration_rowsMerged',
+ accessor: d => d.segmentGenerationProgress?.rowsMerged || 0,
+ className: 'padded',
+ width: 180,
+ Cell({ value }) {
+ return <BracedText text={formatRows(value)}
braces={bracesSegmentRowsMerged} />;
+ },
+ },
+ {
+ Header: twoLines('Pushed', <i>rows</i>),
+ id: 'segmentGeneration_rowsPushed',
+ accessor: d => d.segmentGenerationProgress?.rowsPushed || 0,
+ className: 'padded',
+ width: 180,
+ Cell({ value }) {
+ return <BracedText text={formatRows(value)}
braces={bracesSegmentRowsPushed} />;
+ },
+ },
+ ]
+ : [],
)}
/>
);
@@ -420,6 +459,22 @@ ${title} uncompressed size: ${formatBytesCompact(
);
}
+ function dataProcessedSegmentGeneration(
+ stage: StageDefinition,
+ field: SegmentGenerationProgressFields,
+ ) {
+ if (!stages.hasCounterForStage(stage, 'segmentGenerationProgress')) return;
+
+ return (
+ <div className="data-transfer">
+ <BracedText
+
text={formatRows(stages.getTotalSegmentGenerationProgressForStage(stage,
field))}
+ braces={rowsValues}
+ />
+ </div>
+ );
+ }
+
return (
<ReactTable
className={classNames('execution-stages-pane', DEFAULT_TABLE_CLASS_NAME)}
@@ -510,10 +565,17 @@ ${title} uncompressed size: ${formatBytesCompact(
<>
<div className="counter-spacer extend-right" />
<div>{stages.getStageCounterTitle(stage, 'output')}</div>
+ {stages.hasCounterForStage(stage, 'shuffle') && (
+ <div>{stages.getStageCounterTitle(stage,
'shuffle')}</div>
+ )}
</>
)}
- {stages.hasCounterForStage(stage, 'shuffle') && (
- <div>{stages.getStageCounterTitle(stage, 'shuffle')}</div>
+ {stages.hasCounterForStage(stage, 'segmentGenerationProgress')
&& (
+ <>
+ <div className="counter-spacer extend-right" />
+ <div>Merged</div>
+ <div>Pushed</div>
+ </>
)}
</>
);
@@ -536,10 +598,19 @@ ${title} uncompressed size: ${formatBytesCompact(
: dataProcessedInput(stage, i),
)}
{stages.hasCounterForStage(stage, 'output') && (
- <div className="counter-spacer extend-left" />
+ <>
+ <div className="counter-spacer extend-left" />
+ {dataProcessedOutput(stage)}
+ {dataProcessedShuffle(stage)}
+ </>
+ )}
+ {stages.hasCounterForStage(stage, 'segmentGenerationProgress')
&& (
+ <>
+ <div className="counter-spacer extend-left" />
+ {dataProcessedSegmentGeneration(stage, 'rowsMerged')}
+ {dataProcessedSegmentGeneration(stage, 'rowsPushed')}
+ </>
)}
- {dataProcessedOutput(stage)}
- {dataProcessedShuffle(stage)}
</>
);
},
diff --git
a/web-console/src/views/workbench-view/ingest-success-pane/__snapshots__/ingest-success-pane.spec.tsx.snap
b/web-console/src/views/workbench-view/ingest-success-pane/__snapshots__/ingest-success-pane.spec.tsx.snap
index 31b434cce1..df513e4a6d 100644
---
a/web-console/src/views/workbench-view/ingest-success-pane/__snapshots__/ingest-success-pane.spec.tsx.snap
+++
b/web-console/src/views/workbench-view/ingest-success-pane/__snapshots__/ingest-success-pane.spec.tsx.snap
@@ -5,7 +5,7 @@ exports[`IngestSuccessPane matches snapshot 1`] = `
className="ingest-success-pane"
>
<p>
- 465,346 rows inserted into 'kttm_simple'.
+ 465,346 rows inserted into "kttm_simple".
</p>
<p>
Insert query took 0:00:09.
diff --git
a/web-console/src/views/workbench-view/ingest-success-pane/ingest-success-pane.tsx
b/web-console/src/views/workbench-view/ingest-success-pane/ingest-success-pane.tsx
index 37639ae053..39a52798c5 100644
---
a/web-console/src/views/workbench-view/ingest-success-pane/ingest-success-pane.tsx
+++
b/web-console/src/views/workbench-view/ingest-success-pane/ingest-success-pane.tsx
@@ -56,7 +56,7 @@ export const IngestSuccessPane = React.memo(function
IngestSuccessPane(
return (
<div className="ingest-success-pane">
<p>
- {`${rows < 0 ? 'Data' : pluralIfNeeded(rows, 'row')} inserted into
'${datasource}'.`}
+ {`${rows < 0 ? 'Data' : pluralIfNeeded(rows, 'row')} inserted into
${T(datasource)}.`}
{warnings > 0 && (
<>
{' '}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]