This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 06268bf060e only pick kafka input format by default when needed 
(#16180)
06268bf060e is described below

commit 06268bf060ee044928e395015830fe5cb37eb67c
Author: Vadim Ogievetsky <va...@ogievetsky.com>
AuthorDate: Mon Apr 1 13:47:49 2024 -0700

    only pick kafka input format by default when needed (#16180)
---
 .../ingestion-spec/ingestion-spec.spec.ts          | 31 ++++++++++++++++++++++
 .../druid-models/ingestion-spec/ingestion-spec.tsx | 23 +++++++++++++---
 2 files changed, 50 insertions(+), 4 deletions(-)

diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts 
b/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts
index febc7613918..d58e8c07792 100644
--- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts
+++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts
@@ -24,6 +24,7 @@ import {
   cleanSpec,
   guessColumnTypeFromInput,
   guessColumnTypeFromSampleResponse,
+  guessKafkaInputFormat,
   guessSimpleInputFormat,
   updateSchemaWithSample,
   upgradeSpec,
@@ -669,6 +670,36 @@ describe('ingestion-spec', () => {
       });
     });
   });
+
+  describe('guessKafkaInputFormat', () => {
+    const sample = [
+      {
+        'kafka.timestamp': 1710962988515,
+        'kafka.topic': 'kttm2',
+        'raw':
+          
'{"timestamp":"2019-08-25T00:00:00.031Z","session":"S56194838","number":"16","event":{"type":"PercentClear","percentage":55},"agent":{"type":"Browser","category":"Personal
 computer","browser":"Chrome","browser_version":"76.0.3809.100","os":"Windows 
7","platform":"Windows"},"client_ip":"181.13.41.82","geo_ip":{"continent":"South
 America","country":"Argentina","region":"Santa 
Fe","city":"Rosario"},"language":["es","es-419"],"adblock_list":"NoAdblock","app_version":"1.9.6","path":
 [...]
+      },
+      {
+        'kafka.timestamp': 1710962988518,
+        'kafka.topic': 'kttm2',
+        'raw':
+          
'{"timestamp":"2019-08-25T00:00:00.059Z","session":"S46093731","number":"24","event":{"type":"PercentClear","percentage":85},"agent":{"type":"Mobile
 Browser","category":"Smartphone","browser":"Chrome 
Mobile","browser_version":"50.0.2661.89","os":"Android","platform":"Android"},"client_ip":"177.242.100.0","geo_ip":{"continent":"North
 America","country":"Mexico","region":"Chihuahua","city":"Nuevo Casas 
Grandes"},"language":["en","es","es-419","es-MX"],"adblock_list":"NoAdblock"," 
[...]
+      },
+    ];
+
+    it('works when single topic', () => {
+      expect(guessKafkaInputFormat(sample, false)).toEqual({ type: 'json' });
+    });
+
+    it('works when multi-topic', () => {
+      expect(guessKafkaInputFormat(sample, true)).toEqual({
+        type: 'kafka',
+        valueFormat: {
+          type: 'json',
+        },
+      });
+    });
+  });
 });
 
 describe('spec utils', () => {
diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx 
b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
index ec765b9c345..205d287ac2b 100644
--- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
+++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
@@ -2418,7 +2418,10 @@ export function fillInputFormatIfNeeded(
     spec,
     'spec.ioConfig.inputFormat',
     getSpecType(spec) === 'kafka'
-      ? guessKafkaInputFormat(filterMap(sampleResponse.data, l => l.input))
+      ? guessKafkaInputFormat(
+          filterMap(sampleResponse.data, l => l.input),
+          typeof deepGet(spec, 'spec.ioConfig.topicPattern') === 'string',
+        )
       : guessSimpleInputFormat(
           filterMap(sampleResponse.data, l => l.input?.raw),
           isStreamingSpec(spec),
@@ -2430,15 +2433,27 @@ function noNumbers(xs: string[]): boolean {
   return xs.every(x => isNaN(Number(x)));
 }
 
-export function guessKafkaInputFormat(sampleRaw: Record<string, any>[]): 
InputFormat {
+export function guessKafkaInputFormat(
+  sampleRaw: Record<string, any>[],
+  multiTopic: boolean,
+): InputFormat {
   const hasHeader = sampleRaw.some(x => Object.keys(x).some(k => 
k.startsWith('kafka.header.')));
   const keys = filterMap(sampleRaw, x => x['kafka.key']);
-  const payloads = filterMap(sampleRaw, x => x.raw);
+  const valueFormat = guessSimpleInputFormat(
+    filterMap(sampleRaw, x => x.raw),
+    true,
+  );
+
+  if (!hasHeader && !keys.length && !multiTopic) {
+    // No headers or keys and just a single topic means do not pick the 
'kafka' format by default as it is less performant
+    return valueFormat;
+  }
+
   return {
     type: 'kafka',
     headerFormat: hasHeader ? { type: 'string' } : undefined,
     keyFormat: keys.length ? guessSimpleInputFormat(keys, true) : undefined,
-    valueFormat: guessSimpleInputFormat(payloads, true),
+    valueFormat,
   };
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to