This is an automated email from the ASF dual-hosted git repository.
vogievetsky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 56c03582cf1 support kinesis input format (#16850)
56c03582cf1 is described below
commit 56c03582cf1e969fa36717ba843b1119211e5eda
Author: Vadim Ogievetsky <[email protected]>
AuthorDate: Wed Aug 7 10:24:28 2024 -0700
support kinesis input format (#16850)
---
.../druid-models/ingestion-spec/ingestion-spec.tsx | 3 +-
.../src/druid-models/input-format/input-format.tsx | 42 ++++++++++++-
web-console/src/utils/sampler.ts | 11 +++-
.../src/views/load-data-view/load-data-view.tsx | 72 +++++++++++++++-------
4 files changed, 102 insertions(+), 26 deletions(-)
diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
index d51fd34e901..3a7f0ae5674 100644
--- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
+++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
@@ -2441,11 +2441,12 @@ export function fillInputFormatIfNeeded(
sampleResponse: SampleResponse,
): Partial<IngestionSpec> {
if (deepGet(spec, 'spec.ioConfig.inputFormat.type')) return spec;
+ const specType = getSpecType(spec);
return deepSet(
spec,
'spec.ioConfig.inputFormat',
- getSpecType(spec) === 'kafka'
+ specType === 'kafka'
? guessKafkaInputFormat(
filterMap(sampleResponse.data, l => l.input),
typeof deepGet(spec, 'spec.ioConfig.topicPattern') === 'string',
diff --git a/web-console/src/druid-models/input-format/input-format.tsx
b/web-console/src/druid-models/input-format/input-format.tsx
index a5a1fbdfa07..970e88e2b99 100644
--- a/web-console/src/druid-models/input-format/input-format.tsx
+++ b/web-console/src/druid-models/input-format/input-format.tsx
@@ -60,16 +60,29 @@ const KNOWN_TYPES = [
'avro_stream',
'protobuf',
'regex',
- 'kafka',
'javascript',
+ 'kafka',
+ 'kinesis',
];
+
function generateInputFormatFields(streaming: boolean) {
return compact([
{
name: 'type',
label: 'Input format',
type: 'string',
- suggestions: KNOWN_TYPES,
+ suggestions: [
+ 'json',
+ 'csv',
+ 'tsv',
+ 'parquet',
+ 'orc',
+ 'avro_ocf',
+ 'avro_stream',
+ 'protobuf',
+ 'regex',
+ 'javascript',
+ ],
required: true,
info: (
<>
@@ -606,12 +619,35 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS:
Field<InputFormat>[] = [
},
];
+export const KINESIS_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
+ {
+ name: 'timestampColumnName',
+ label: 'Kinesis timestamp column name',
+ type: 'string',
+ defaultValue: 'kinesis.timestamp',
+ defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
+ info: `The name of the column for the Kinesis timestamp.`,
+ },
+ {
+ name: 'partitionKeyColumnName',
+ label: 'Kinesis partition key column name',
+ type: 'string',
+ defaultValue: 'kinesis.partitionKey',
+ defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
+ info: `The name of the column for the Kinesis partition key. This field is
useful when ingesting data from multiple partitions into the same datasource.`,
+ },
+];
+
export function issueWithInputFormat(inputFormat: InputFormat | undefined):
string | undefined {
return AutoForm.issueWithModel(inputFormat, BATCH_INPUT_FORMAT_FIELDS);
}
+export function isKafkaOrKinesis(type: string | undefined): type is 'kafka' |
'kinesis' {
+ return type === 'kafka' || type === 'kinesis';
+}
+
export function inputFormatCanProduceNestedData(inputFormat: InputFormat):
boolean {
- if (inputFormat.type === 'kafka') {
+ if (isKafkaOrKinesis(inputFormat.type)) {
return Boolean(
inputFormat.valueFormat &&
inputFormatCanProduceNestedData(inputFormat.valueFormat),
);
diff --git a/web-console/src/utils/sampler.ts b/web-console/src/utils/sampler.ts
index 28489c52400..cc9ae32b8ae 100644
--- a/web-console/src/utils/sampler.ts
+++ b/web-console/src/utils/sampler.ts
@@ -251,6 +251,11 @@ const KAFKA_SAMPLE_INPUT_FORMAT: InputFormat = {
valueFormat: WHOLE_ROW_INPUT_FORMAT,
};
+const KINESIS_SAMPLE_INPUT_FORMAT: InputFormat = {
+ type: 'kinesis',
+ valueFormat: WHOLE_ROW_INPUT_FORMAT,
+};
+
export async function sampleForConnect(
spec: Partial<IngestionSpec>,
sampleStrategy: SampleStrategy,
@@ -267,7 +272,11 @@ export async function sampleForConnect(
ioConfig = deepSet(
ioConfig,
'inputFormat',
- samplerType === 'kafka' ? KAFKA_SAMPLE_INPUT_FORMAT :
WHOLE_ROW_INPUT_FORMAT,
+ samplerType === 'kafka'
+ ? KAFKA_SAMPLE_INPUT_FORMAT
+ : samplerType === 'kinesis'
+ ? KINESIS_SAMPLE_INPUT_FORMAT
+ : WHOLE_ROW_INPUT_FORMAT,
);
}
diff --git a/web-console/src/views/load-data-view/load-data-view.tsx
b/web-console/src/views/load-data-view/load-data-view.tsx
index 0fdf68c88c1..968b53888c6 100644
--- a/web-console/src/views/load-data-view/load-data-view.tsx
+++ b/web-console/src/views/load-data-view/load-data-view.tsx
@@ -113,11 +113,13 @@ import {
invalidPartitionConfig,
isDruidSource,
isEmptyIngestionSpec,
+ isKafkaOrKinesis,
isStreamingSpec,
issueWithIoConfig,
issueWithSampleData,
joinFilter,
KAFKA_METADATA_INPUT_FORMAT_FIELDS,
+ KINESIS_METADATA_INPUT_FORMAT_FIELDS,
KNOWN_FILTER_TYPES,
MAX_INLINE_DATA_LENGTH,
METRIC_SPEC_FIELDS,
@@ -244,30 +246,44 @@ function showKafkaLine(line: SampleEntry): string {
]).join('\n');
}
+function showKinesisLine(line: SampleEntry): string {
+ const { input } = line;
+ if (!input) return 'Invalid kinesis row';
+ return compact([
+ `[ Kinesis timestamp: ${input['kinesis.timestamp']}`,
+ input['kinesis.partitionKey'] ? ` Partition key:
${input['kinesis.partitionKey']}` : undefined,
+ ` Payload: ${input.raw}`,
+ ']',
+ ]).join('\n');
+}
+
function showBlankLine(line: SampleEntry): string {
return line.parsed ? `[Row: ${JSONBig.stringify(line.parsed)}]` : '[Binary
data]';
}
function formatSampleEntries(
sampleEntries: SampleEntry[],
- druidSource: boolean,
- kafkaSource: boolean,
+ specialSource: undefined | 'druid' | 'kafka' | 'kinesis',
): string {
if (!sampleEntries.length) return 'No data returned from sampler';
- if (druidSource) {
- return sampleEntries.map(showDruidLine).join('\n');
- }
+ switch (specialSource) {
+ case 'druid':
+ return sampleEntries.map(showDruidLine).join('\n');
- if (kafkaSource) {
- return sampleEntries.map(showKafkaLine).join('\n');
- }
+ case 'kafka':
+ return sampleEntries.map(showKafkaLine).join('\n');
- return (
- sampleEntries.every(l => !l.parsed)
- ? sampleEntries.map(showBlankLine)
- : sampleEntries.map(showRawLine)
- ).join('\n');
+ case 'kinesis':
+ return sampleEntries.map(showKinesisLine).join('\n');
+
+ default:
+ return (
+ sampleEntries.every(l => !l.parsed)
+ ? sampleEntries.map(showBlankLine)
+ : sampleEntries.map(showRawLine)
+ ).join('\n');
+ }
}
function getTimestampSpec(sampleResponse: SampleResponse | null):
TimestampSpec {
@@ -1239,10 +1255,11 @@ export class LoadDataView extends
React.PureComponent<LoadDataViewProps, LoadDat
renderConnectStep() {
const { inputQueryState, sampleStrategy } = this.state;
const spec = this.getEffectiveSpec();
+ const specType = getSpecType(spec);
const ioConfig: IoConfig = deepGet(spec, 'spec.ioConfig') || EMPTY_OBJECT;
const inlineMode = deepGet(spec, 'spec.ioConfig.inputSource.type') ===
'inline';
const druidSource = isDruidSource(spec);
- const kafkaSource = getSpecType(spec) === 'kafka';
+ const specialSource = druidSource ? 'druid' : isKafkaOrKinesis(specType) ?
specType : undefined;
let mainFill: JSX.Element | string;
if (inlineMode) {
@@ -1274,7 +1291,7 @@ export class LoadDataView extends
React.PureComponent<LoadDataViewProps, LoadDat
<TextArea
className="raw-lines"
readOnly
- value={formatSampleEntries(inputData, druidSource, kafkaSource)}
+ value={formatSampleEntries(inputData, specialSource)}
/>
)}
{inputQueryState.isLoading() && <Loader />}
@@ -1544,11 +1561,11 @@ export class LoadDataView extends
React.PureComponent<LoadDataViewProps, LoadDat
<ParserMessage />
{!selectedFlattenField && (
<>
- {specType !== 'kafka' ? (
+ {!isKafkaOrKinesis(specType) ? (
normalInputAutoForm
) : (
<>
- {inputFormat?.type !== 'kafka' ? (
+ {!isKafkaOrKinesis(inputFormat?.type) ? (
normalInputAutoForm
) : (
<AutoForm
@@ -1563,18 +1580,22 @@ export class LoadDataView extends
React.PureComponent<LoadDataViewProps, LoadDat
)}
<FormGroup className="parse-metadata">
<Switch
- label="Parse Kafka metadata (ts, headers, key)"
- checked={inputFormat?.type === 'kafka'}
+ label={
+ specType === 'kafka'
+ ? 'Parse Kafka metadata (ts, headers, key)'
+ : 'Parse Kinesis metadata (ts, partition key)'
+ }
+ checked={isKafkaOrKinesis(inputFormat?.type)}
onChange={() => {
this.updateSpecPreview(
- inputFormat?.type === 'kafka'
+ isKafkaOrKinesis(inputFormat?.type)
? deepMove(
spec,
'spec.ioConfig.inputFormat.valueFormat',
'spec.ioConfig.inputFormat',
)
: deepSet(spec, 'spec.ioConfig.inputFormat', {
- type: 'kafka',
+ type: specType,
valueFormat: inputFormat,
}),
);
@@ -1590,6 +1611,15 @@ export class LoadDataView extends
React.PureComponent<LoadDataViewProps, LoadDat
}
/>
)}
+ {inputFormat?.type === 'kinesis' && (
+ <AutoForm
+ fields={KINESIS_METADATA_INPUT_FORMAT_FIELDS}
+ model={inputFormat}
+ onChange={p =>
+ this.updateSpecPreview(deepSet(spec,
'spec.ioConfig.inputFormat', p))
+ }
+ />
+ )}
</>
)}
{possibleSystemFields.length > 0 && (
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]