This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 06268bf060e only pick kafka input format by default when needed
(#16180)
06268bf060e is described below
commit 06268bf060ee044928e395015830fe5cb37eb67c
Author: Vadim Ogievetsky <[email protected]>
AuthorDate: Mon Apr 1 13:47:49 2024 -0700
only pick kafka input format by default when needed (#16180)
---
.../ingestion-spec/ingestion-spec.spec.ts | 31 ++++++++++++++++++++++
.../druid-models/ingestion-spec/ingestion-spec.tsx | 23 +++++++++++++---
2 files changed, 50 insertions(+), 4 deletions(-)
diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts
b/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts
index febc7613918..d58e8c07792 100644
--- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts
+++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts
@@ -24,6 +24,7 @@ import {
cleanSpec,
guessColumnTypeFromInput,
guessColumnTypeFromSampleResponse,
+ guessKafkaInputFormat,
guessSimpleInputFormat,
updateSchemaWithSample,
upgradeSpec,
@@ -669,6 +670,36 @@ describe('ingestion-spec', () => {
});
});
});
+
+ describe('guessKafkaInputFormat', () => {
+ const sample = [
+ {
+ 'kafka.timestamp': 1710962988515,
+ 'kafka.topic': 'kttm2',
+ 'raw':
+
'{"timestamp":"2019-08-25T00:00:00.031Z","session":"S56194838","number":"16","event":{"type":"PercentClear","percentage":55},"agent":{"type":"Browser","category":"Personal
computer","browser":"Chrome","browser_version":"76.0.3809.100","os":"Windows
7","platform":"Windows"},"client_ip":"181.13.41.82","geo_ip":{"continent":"South
America","country":"Argentina","region":"Santa
Fe","city":"Rosario"},"language":["es","es-419"],"adblock_list":"NoAdblock","app_version":"1.9.6","path":
[...]
+ },
+ {
+ 'kafka.timestamp': 1710962988518,
+ 'kafka.topic': 'kttm2',
+ 'raw':
+
'{"timestamp":"2019-08-25T00:00:00.059Z","session":"S46093731","number":"24","event":{"type":"PercentClear","percentage":85},"agent":{"type":"Mobile
Browser","category":"Smartphone","browser":"Chrome
Mobile","browser_version":"50.0.2661.89","os":"Android","platform":"Android"},"client_ip":"177.242.100.0","geo_ip":{"continent":"North
America","country":"Mexico","region":"Chihuahua","city":"Nuevo Casas
Grandes"},"language":["en","es","es-419","es-MX"],"adblock_list":"NoAdblock","
[...]
+ },
+ ];
+
+ it('works when single topic', () => {
+ expect(guessKafkaInputFormat(sample, false)).toEqual({ type: 'json' });
+ });
+
+ it('works when multi-topic', () => {
+ expect(guessKafkaInputFormat(sample, true)).toEqual({
+ type: 'kafka',
+ valueFormat: {
+ type: 'json',
+ },
+ });
+ });
+ });
});
describe('spec utils', () => {
diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
index ec765b9c345..205d287ac2b 100644
--- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
+++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
@@ -2418,7 +2418,10 @@ export function fillInputFormatIfNeeded(
spec,
'spec.ioConfig.inputFormat',
getSpecType(spec) === 'kafka'
- ? guessKafkaInputFormat(filterMap(sampleResponse.data, l => l.input))
+ ? guessKafkaInputFormat(
+ filterMap(sampleResponse.data, l => l.input),
+ typeof deepGet(spec, 'spec.ioConfig.topicPattern') === 'string',
+ )
: guessSimpleInputFormat(
filterMap(sampleResponse.data, l => l.input?.raw),
isStreamingSpec(spec),
@@ -2430,15 +2433,27 @@ function noNumbers(xs: string[]): boolean {
return xs.every(x => isNaN(Number(x)));
}
-export function guessKafkaInputFormat(sampleRaw: Record<string, any>[]):
InputFormat {
+export function guessKafkaInputFormat(
+ sampleRaw: Record<string, any>[],
+ multiTopic: boolean,
+): InputFormat {
const hasHeader = sampleRaw.some(x => Object.keys(x).some(k =>
k.startsWith('kafka.header.')));
const keys = filterMap(sampleRaw, x => x['kafka.key']);
- const payloads = filterMap(sampleRaw, x => x.raw);
+ const valueFormat = guessSimpleInputFormat(
+ filterMap(sampleRaw, x => x.raw),
+ true,
+ );
+
+ if (!hasHeader && !keys.length && !multiTopic) {
+ // No headers or keys and just a single topic means do not pick the
'kafka' format by default as it is less performant
+ return valueFormat;
+ }
+
return {
type: 'kafka',
headerFormat: hasHeader ? { type: 'string' } : undefined,
keyFormat: keys.length ? guessSimpleInputFormat(keys, true) : undefined,
- valueFormat: guessSimpleInputFormat(payloads, true),
+ valueFormat,
};
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]