This is an automated email from the ASF dual-hosted git repository. gian pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new 06268bf060e only pick kafka input format by default when needed (#16180) 06268bf060e is described below commit 06268bf060ee044928e395015830fe5cb37eb67c Author: Vadim Ogievetsky <va...@ogievetsky.com> AuthorDate: Mon Apr 1 13:47:49 2024 -0700 only pick kafka input format by default when needed (#16180) --- .../ingestion-spec/ingestion-spec.spec.ts | 31 ++++++++++++++++++++++ .../druid-models/ingestion-spec/ingestion-spec.tsx | 23 +++++++++++++--- 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts b/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts index febc7613918..d58e8c07792 100644 --- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts +++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts @@ -24,6 +24,7 @@ import { cleanSpec, guessColumnTypeFromInput, guessColumnTypeFromSampleResponse, + guessKafkaInputFormat, guessSimpleInputFormat, updateSchemaWithSample, upgradeSpec, @@ -669,6 +670,36 @@ describe('ingestion-spec', () => { }); }); }); + + describe('guessKafkaInputFormat', () => { + const sample = [ + { + 'kafka.timestamp': 1710962988515, + 'kafka.topic': 'kttm2', + 'raw': + '{"timestamp":"2019-08-25T00:00:00.031Z","session":"S56194838","number":"16","event":{"type":"PercentClear","percentage":55},"agent":{"type":"Browser","category":"Personal computer","browser":"Chrome","browser_version":"76.0.3809.100","os":"Windows 7","platform":"Windows"},"client_ip":"181.13.41.82","geo_ip":{"continent":"South America","country":"Argentina","region":"Santa Fe","city":"Rosario"},"language":["es","es-419"],"adblock_list":"NoAdblock","app_version":"1.9.6","path": [...] + }, + { + 'kafka.timestamp': 1710962988518, + 'kafka.topic': 'kttm2', + 'raw': + '{"timestamp":"2019-08-25T00:00:00.059Z","session":"S46093731","number":"24","event":{"type":"PercentClear","percentage":85},"agent":{"type":"Mobile Browser","category":"Smartphone","browser":"Chrome Mobile","browser_version":"50.0.2661.89","os":"Android","platform":"Android"},"client_ip":"177.242.100.0","geo_ip":{"continent":"North America","country":"Mexico","region":"Chihuahua","city":"Nuevo Casas Grandes"},"language":["en","es","es-419","es-MX"],"adblock_list":"NoAdblock"," [...] + }, + ]; + + it('works when single topic', () => { + expect(guessKafkaInputFormat(sample, false)).toEqual({ type: 'json' }); + }); + + it('works when multi-topic', () => { + expect(guessKafkaInputFormat(sample, true)).toEqual({ + type: 'kafka', + valueFormat: { + type: 'json', + }, + }); + }); + }); }); describe('spec utils', () => { diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx index ec765b9c345..205d287ac2b 100644 --- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx +++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx @@ -2418,7 +2418,10 @@ export function fillInputFormatIfNeeded( spec, 'spec.ioConfig.inputFormat', getSpecType(spec) === 'kafka' - ? guessKafkaInputFormat(filterMap(sampleResponse.data, l => l.input)) + ? guessKafkaInputFormat( + filterMap(sampleResponse.data, l => l.input), + typeof deepGet(spec, 'spec.ioConfig.topicPattern') === 'string', + ) : guessSimpleInputFormat( filterMap(sampleResponse.data, l => l.input?.raw), isStreamingSpec(spec), @@ -2430,15 +2433,27 @@ function noNumbers(xs: string[]): boolean { return xs.every(x => isNaN(Number(x))); } -export function guessKafkaInputFormat(sampleRaw: Record<string, any>[]): InputFormat { +export function guessKafkaInputFormat( + sampleRaw: Record<string, any>[], + multiTopic: boolean, +): InputFormat { const hasHeader = sampleRaw.some(x => Object.keys(x).some(k => k.startsWith('kafka.header.'))); const keys = filterMap(sampleRaw, x => x['kafka.key']); - const payloads = filterMap(sampleRaw, x => x.raw); + const valueFormat = guessSimpleInputFormat( + filterMap(sampleRaw, x => x.raw), + true, + ); + + if (!hasHeader && !keys.length && !multiTopic) { + // No headers or keys and just a single topic means do not pick the 'kafka' format by default as it is less performant + return valueFormat; + } + return { type: 'kafka', headerFormat: hasHeader ? { type: 'string' } : undefined, keyFormat: keys.length ? guessSimpleInputFormat(keys, true) : undefined, - valueFormat: guessSimpleInputFormat(payloads, true), + valueFormat, }; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org