This is an automated email from the ASF dual-hosted git repository.

vogievetsky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 56c03582cf1 support kinesis input format (#16850)
56c03582cf1 is described below

commit 56c03582cf1e969fa36717ba843b1119211e5eda
Author: Vadim Ogievetsky <[email protected]>
AuthorDate: Wed Aug 7 10:24:28 2024 -0700

    support kinesis input format (#16850)
---
 .../druid-models/ingestion-spec/ingestion-spec.tsx |  3 +-
 .../src/druid-models/input-format/input-format.tsx | 42 ++++++++++++-
 web-console/src/utils/sampler.ts                   | 11 +++-
 .../src/views/load-data-view/load-data-view.tsx    | 72 +++++++++++++++-------
 4 files changed, 102 insertions(+), 26 deletions(-)

diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx 
b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
index d51fd34e901..3a7f0ae5674 100644
--- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
+++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
@@ -2441,11 +2441,12 @@ export function fillInputFormatIfNeeded(
   sampleResponse: SampleResponse,
 ): Partial<IngestionSpec> {
   if (deepGet(spec, 'spec.ioConfig.inputFormat.type')) return spec;
+  const specType = getSpecType(spec);
 
   return deepSet(
     spec,
     'spec.ioConfig.inputFormat',
-    getSpecType(spec) === 'kafka'
+    specType === 'kafka'
       ? guessKafkaInputFormat(
           filterMap(sampleResponse.data, l => l.input),
           typeof deepGet(spec, 'spec.ioConfig.topicPattern') === 'string',
diff --git a/web-console/src/druid-models/input-format/input-format.tsx 
b/web-console/src/druid-models/input-format/input-format.tsx
index a5a1fbdfa07..970e88e2b99 100644
--- a/web-console/src/druid-models/input-format/input-format.tsx
+++ b/web-console/src/druid-models/input-format/input-format.tsx
@@ -60,16 +60,29 @@ const KNOWN_TYPES = [
   'avro_stream',
   'protobuf',
   'regex',
-  'kafka',
   'javascript',
+  'kafka',
+  'kinesis',
 ];
+
 function generateInputFormatFields(streaming: boolean) {
   return compact([
     {
       name: 'type',
       label: 'Input format',
       type: 'string',
-      suggestions: KNOWN_TYPES,
+      suggestions: [
+        'json',
+        'csv',
+        'tsv',
+        'parquet',
+        'orc',
+        'avro_ocf',
+        'avro_stream',
+        'protobuf',
+        'regex',
+        'javascript',
+      ],
       required: true,
       info: (
         <>
@@ -606,12 +619,35 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: 
Field<InputFormat>[] = [
   },
 ];
 
+export const KINESIS_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
+  {
+    name: 'timestampColumnName',
+    label: 'Kinesis timestamp column name',
+    type: 'string',
+    defaultValue: 'kinesis.timestamp',
+    defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
+    info: `The name of the column for the Kinesis timestamp.`,
+  },
+  {
+    name: 'partitionKeyColumnName',
+    label: 'Kinesis partition key column name',
+    type: 'string',
+    defaultValue: 'kinesis.partitionKey',
+    defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
+    info: `The name of the column for the Kinesis partition key. This field is 
useful when ingesting data from multiple partitions into the same datasource.`,
+  },
+];
+
 export function issueWithInputFormat(inputFormat: InputFormat | undefined): 
string | undefined {
   return AutoForm.issueWithModel(inputFormat, BATCH_INPUT_FORMAT_FIELDS);
 }
 
+export function isKafkaOrKinesis(type: string | undefined): type is 'kafka' | 
'kinesis' {
+  return type === 'kafka' || type === 'kinesis';
+}
+
 export function inputFormatCanProduceNestedData(inputFormat: InputFormat): 
boolean {
-  if (inputFormat.type === 'kafka') {
+  if (isKafkaOrKinesis(inputFormat.type)) {
     return Boolean(
       inputFormat.valueFormat && 
inputFormatCanProduceNestedData(inputFormat.valueFormat),
     );
diff --git a/web-console/src/utils/sampler.ts b/web-console/src/utils/sampler.ts
index 28489c52400..cc9ae32b8ae 100644
--- a/web-console/src/utils/sampler.ts
+++ b/web-console/src/utils/sampler.ts
@@ -251,6 +251,11 @@ const KAFKA_SAMPLE_INPUT_FORMAT: InputFormat = {
   valueFormat: WHOLE_ROW_INPUT_FORMAT,
 };
 
+const KINESIS_SAMPLE_INPUT_FORMAT: InputFormat = {
+  type: 'kinesis',
+  valueFormat: WHOLE_ROW_INPUT_FORMAT,
+};
+
 export async function sampleForConnect(
   spec: Partial<IngestionSpec>,
   sampleStrategy: SampleStrategy,
@@ -267,7 +272,11 @@ export async function sampleForConnect(
     ioConfig = deepSet(
       ioConfig,
       'inputFormat',
-      samplerType === 'kafka' ? KAFKA_SAMPLE_INPUT_FORMAT : 
WHOLE_ROW_INPUT_FORMAT,
+      samplerType === 'kafka'
+        ? KAFKA_SAMPLE_INPUT_FORMAT
+        : samplerType === 'kinesis'
+        ? KINESIS_SAMPLE_INPUT_FORMAT
+        : WHOLE_ROW_INPUT_FORMAT,
     );
   }
 
diff --git a/web-console/src/views/load-data-view/load-data-view.tsx 
b/web-console/src/views/load-data-view/load-data-view.tsx
index 0fdf68c88c1..968b53888c6 100644
--- a/web-console/src/views/load-data-view/load-data-view.tsx
+++ b/web-console/src/views/load-data-view/load-data-view.tsx
@@ -113,11 +113,13 @@ import {
   invalidPartitionConfig,
   isDruidSource,
   isEmptyIngestionSpec,
+  isKafkaOrKinesis,
   isStreamingSpec,
   issueWithIoConfig,
   issueWithSampleData,
   joinFilter,
   KAFKA_METADATA_INPUT_FORMAT_FIELDS,
+  KINESIS_METADATA_INPUT_FORMAT_FIELDS,
   KNOWN_FILTER_TYPES,
   MAX_INLINE_DATA_LENGTH,
   METRIC_SPEC_FIELDS,
@@ -244,30 +246,44 @@ function showKafkaLine(line: SampleEntry): string {
   ]).join('\n');
 }
 
+function showKinesisLine(line: SampleEntry): string {
+  const { input } = line;
+  if (!input) return 'Invalid kinesis row';
+  return compact([
+    `[ Kinesis timestamp: ${input['kinesis.timestamp']}`,
+    input['kinesis.partitionKey'] ? `  Partition key: 
${input['kinesis.partitionKey']}` : undefined,
+    `  Payload: ${input.raw}`,
+    ']',
+  ]).join('\n');
+}
+
 function showBlankLine(line: SampleEntry): string {
   return line.parsed ? `[Row: ${JSONBig.stringify(line.parsed)}]` : '[Binary 
data]';
 }
 
 function formatSampleEntries(
   sampleEntries: SampleEntry[],
-  druidSource: boolean,
-  kafkaSource: boolean,
+  specialSource: undefined | 'druid' | 'kafka' | 'kinesis',
 ): string {
   if (!sampleEntries.length) return 'No data returned from sampler';
 
-  if (druidSource) {
-    return sampleEntries.map(showDruidLine).join('\n');
-  }
+  switch (specialSource) {
+    case 'druid':
+      return sampleEntries.map(showDruidLine).join('\n');
 
-  if (kafkaSource) {
-    return sampleEntries.map(showKafkaLine).join('\n');
-  }
+    case 'kafka':
+      return sampleEntries.map(showKafkaLine).join('\n');
 
-  return (
-    sampleEntries.every(l => !l.parsed)
-      ? sampleEntries.map(showBlankLine)
-      : sampleEntries.map(showRawLine)
-  ).join('\n');
+    case 'kinesis':
+      return sampleEntries.map(showKinesisLine).join('\n');
+
+    default:
+      return (
+        sampleEntries.every(l => !l.parsed)
+          ? sampleEntries.map(showBlankLine)
+          : sampleEntries.map(showRawLine)
+      ).join('\n');
+  }
 }
 
 function getTimestampSpec(sampleResponse: SampleResponse | null): 
TimestampSpec {
@@ -1239,10 +1255,11 @@ export class LoadDataView extends 
React.PureComponent<LoadDataViewProps, LoadDat
   renderConnectStep() {
     const { inputQueryState, sampleStrategy } = this.state;
     const spec = this.getEffectiveSpec();
+    const specType = getSpecType(spec);
     const ioConfig: IoConfig = deepGet(spec, 'spec.ioConfig') || EMPTY_OBJECT;
     const inlineMode = deepGet(spec, 'spec.ioConfig.inputSource.type') === 
'inline';
     const druidSource = isDruidSource(spec);
-    const kafkaSource = getSpecType(spec) === 'kafka';
+    const specialSource = druidSource ? 'druid' : isKafkaOrKinesis(specType) ? 
specType : undefined;
 
     let mainFill: JSX.Element | string;
     if (inlineMode) {
@@ -1274,7 +1291,7 @@ export class LoadDataView extends 
React.PureComponent<LoadDataViewProps, LoadDat
             <TextArea
               className="raw-lines"
               readOnly
-              value={formatSampleEntries(inputData, druidSource, kafkaSource)}
+              value={formatSampleEntries(inputData, specialSource)}
             />
           )}
           {inputQueryState.isLoading() && <Loader />}
@@ -1544,11 +1561,11 @@ export class LoadDataView extends 
React.PureComponent<LoadDataViewProps, LoadDat
           <ParserMessage />
           {!selectedFlattenField && (
             <>
-              {specType !== 'kafka' ? (
+              {!isKafkaOrKinesis(specType) ? (
                 normalInputAutoForm
               ) : (
                 <>
-                  {inputFormat?.type !== 'kafka' ? (
+                  {!isKafkaOrKinesis(inputFormat?.type) ? (
                     normalInputAutoForm
                   ) : (
                     <AutoForm
@@ -1563,18 +1580,22 @@ export class LoadDataView extends 
React.PureComponent<LoadDataViewProps, LoadDat
                   )}
                   <FormGroup className="parse-metadata">
                     <Switch
-                      label="Parse Kafka metadata (ts, headers, key)"
-                      checked={inputFormat?.type === 'kafka'}
+                      label={
+                        specType === 'kafka'
+                          ? 'Parse Kafka metadata (ts, headers, key)'
+                          : 'Parse Kinesis metadata (ts, partition key)'
+                      }
+                      checked={isKafkaOrKinesis(inputFormat?.type)}
                       onChange={() => {
                         this.updateSpecPreview(
-                          inputFormat?.type === 'kafka'
+                          isKafkaOrKinesis(inputFormat?.type)
                             ? deepMove(
                                 spec,
                                 'spec.ioConfig.inputFormat.valueFormat',
                                 'spec.ioConfig.inputFormat',
                               )
                             : deepSet(spec, 'spec.ioConfig.inputFormat', {
-                                type: 'kafka',
+                                type: specType,
                                 valueFormat: inputFormat,
                               }),
                         );
@@ -1590,6 +1611,15 @@ export class LoadDataView extends 
React.PureComponent<LoadDataViewProps, LoadDat
                       }
                     />
                   )}
+                  {inputFormat?.type === 'kinesis' && (
+                    <AutoForm
+                      fields={KINESIS_METADATA_INPUT_FORMAT_FIELDS}
+                      model={inputFormat}
+                      onChange={p =>
+                        this.updateSpecPreview(deepSet(spec, 
'spec.ioConfig.inputFormat', p))
+                      }
+                    />
+                  )}
                 </>
               )}
               {possibleSystemFields.length > 0 && (


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to