This is an automated email from the ASF dual-hosted git repository.

vogievetsky pushed a commit to branch fix_concurrent_tasks
in repository https://gitbox.apache.org/repos/asf/druid.git

commit d40427817243eb402532df130e2293f33b5e0701
Author: Vadim Ogievetsky <[email protected]>
AuthorDate: Tue Jan 9 12:42:38 2024 -0800

    Improve handling of concurrent tasks option
---
 web-console/src/components/auto-form/auto-form.tsx | 10 +----
 .../compaction-config-dialog.tsx                   | 27 +++++++++----
 .../compaction-config/compaction-config.tsx        |  9 -----
 .../druid-models/ingestion-spec/ingestion-spec.tsx |  5 ++-
 .../src/views/load-data-view/load-data-view.tsx    | 47 +++++++++-------------
 5 files changed, 44 insertions(+), 54 deletions(-)

diff --git a/web-console/src/components/auto-form/auto-form.tsx 
b/web-console/src/components/auto-form/auto-form.tsx
index c63f7a7bc50..357046876af 100644
--- a/web-console/src/components/auto-form/auto-form.tsx
+++ b/web-console/src/components/auto-form/auto-form.tsx
@@ -72,10 +72,6 @@ export interface Field<M> {
   hide?: Functor<M, boolean>;
   hideInMore?: Functor<M, boolean>;
   valueAdjustment?: (value: any) => any;
-  /**
-   * An optional callback to transform the value before it is set on the input
-   */
-  adjustValue?: (value: any) => any;
   adjustment?: (model: Partial<M>, oldModel: Partial<M>) => Partial<M>;
   issueWithValue?: (value: any) => string | undefined;
 
@@ -382,14 +378,12 @@ export class AutoForm<T extends Record<string, any>> 
extends React.PureComponent
     const disabled = AutoForm.evaluateFunctor(field.disabled, model, false);
     const intent = required && modelValue == null ? AutoForm.REQUIRED_INTENT : 
undefined;
 
-    const adjustedValue = field.adjustValue ? field.adjustValue(shownValue) : 
shownValue;
-
     return (
       <ButtonGroup large={large}>
         <Button
           intent={intent}
           disabled={disabled}
-          active={adjustedValue === false}
+          active={shownValue === false}
           onClick={() => {
             this.fieldChange(field, false);
             if (onFinalize) onFinalize();
@@ -400,7 +394,7 @@ export class AutoForm<T extends Record<string, any>> 
extends React.PureComponent
         <Button
           intent={intent}
           disabled={disabled}
-          active={adjustedValue === true}
+          active={shownValue === true}
           onClick={() => {
             this.fieldChange(field, true);
             if (onFinalize) onFinalize();
diff --git 
a/web-console/src/dialogs/compaction-config-dialog/compaction-config-dialog.tsx 
b/web-console/src/dialogs/compaction-config-dialog/compaction-config-dialog.tsx
index 8a3a232072e..ec2135e43aa 100644
--- 
a/web-console/src/dialogs/compaction-config-dialog/compaction-config-dialog.tsx
+++ 
b/web-console/src/dialogs/compaction-config-dialog/compaction-config-dialog.tsx
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-import { Button, Callout, Classes, Code, Dialog, Intent } from 
'@blueprintjs/core';
+import { Button, Callout, Classes, Code, Dialog, Intent, Switch } from 
'@blueprintjs/core';
 import React, { useState } from 'react';
 
 import type { FormJsonTabs } from '../../components';
@@ -26,7 +26,7 @@ import {
   COMPACTION_CONFIG_FIELDS,
   compactionConfigHasLegacyInputSegmentSizeBytesSet,
 } from '../../druid-models';
-import { deepDelete, formatBytesCompact } from '../../utils';
+import { deepDelete, deepGet, deepSet, formatBytesCompact } from '../../utils';
 import { CompactionHistoryDialog } from 
'../compaction-history-dialog/compaction-history-dialog';
 
 import './compaction-config-dialog.scss';
@@ -96,11 +96,24 @@ export const CompactionConfigDialog = React.memo(function 
CompactionConfigDialog
       />
       <div className="content">
         {currentTab === 'form' ? (
-          <AutoForm
-            fields={COMPACTION_CONFIG_FIELDS}
-            model={currentConfig}
-            onChange={m => setCurrentConfig(m as CompactionConfig)}
-          />
+          <>
+            <AutoForm
+              fields={COMPACTION_CONFIG_FIELDS}
+              model={currentConfig}
+              onChange={m => setCurrentConfig(m as CompactionConfig)}
+            />
+            <Switch
+              label="Allow concurrent compactions (experimental)"
+              checked={typeof deepGet(currentConfig, 
'taskContext.taskLockType') === 'string'}
+              onChange={() => {
+                setCurrentConfig(
+                  (typeof deepGet(currentConfig, 'taskContext.taskLockType') 
=== 'string'
+                    ? deepDelete(currentConfig, 'taskContext.taskLockType')
+                    : deepSet(currentConfig, 'taskContext.taskLockType', 
'REPLACE')) as any,
+                );
+              }}
+            />
+          </>
         ) : (
           <JsonInput
             value={currentConfig}
diff --git 
a/web-console/src/druid-models/compaction-config/compaction-config.tsx 
b/web-console/src/druid-models/compaction-config/compaction-config.tsx
index a074af7ae14..8c03b6a5786 100644
--- a/web-console/src/druid-models/compaction-config/compaction-config.tsx
+++ b/web-console/src/druid-models/compaction-config/compaction-config.tsx
@@ -354,13 +354,4 @@ export const COMPACTION_CONFIG_FIELDS: 
Field<CompactionConfig>[] = [
       </>
     ),
   },
-  {
-    name: 'taskContext.taskLockType',
-    type: 'boolean',
-    label: 'Allow concurrent compactions (experimental)',
-    defaultValue: undefined,
-    valueAdjustment: v => (v ? 'REPLACE' : undefined),
-    adjustValue: v => v === 'REPLACE',
-    info: <p>Allows or forbids concurrent compactions.</p>,
-  },
 ];
diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx 
b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
index 7dd6bd4e82f..3594f70c992 100644
--- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
+++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
@@ -347,7 +347,10 @@ export function normalizeSpec(spec: 
Partial<IngestionSpec>): IngestionSpec {
   spec = deepSetIfUnset(spec, 'spec.tuningConfig.type', specType);
 
   if (spec.context?.taskLockType !== undefined) {
-    spec.context.taskLockType = spec.spec?.ioConfig.appendToExisting ? 
'APPEND' : 'REPLACE';
+    spec.context.taskLockType =
+      isStreamingSpec(spec) || deepGet(spec, 'spec.ioConfig.appendToExisting')
+        ? 'APPEND'
+        : 'REPLACE';
   }
 
   return spec as IngestionSpec;
diff --git a/web-console/src/views/load-data-view/load-data-view.tsx 
b/web-console/src/views/load-data-view/load-data-view.tsx
index af39c2d0385..db7e8e0f093 100644
--- a/web-console/src/views/load-data-view/load-data-view.tsx
+++ b/web-console/src/views/load-data-view/load-data-view.tsx
@@ -3134,8 +3134,6 @@ export class LoadDataView extends 
React.PureComponent<LoadDataViewProps, LoadDat
     const { spec } = this.state;
     const parallel = deepGet(spec, 
'spec.tuningConfig.maxNumConcurrentSubTasks') > 1;
 
-    const appendToExisting = spec.spec?.ioConfig.appendToExisting;
-
     return (
       <>
         <div className="main">
@@ -3161,6 +3159,7 @@ export class LoadDataView extends 
React.PureComponent<LoadDataViewProps, LoadDat
                 name: 'spec.ioConfig.appendToExisting',
                 label: 'Append to existing',
                 type: 'boolean',
+                defined: s => !isStreamingSpec(s),
                 defaultValue: false,
                 // appendToExisting can only be set on 'dynamic' portioning.
                 // We chose to show it always and instead have a specific 
message, separate from this form, to notify the user of the issue.
@@ -3171,37 +3170,27 @@ export class LoadDataView extends 
React.PureComponent<LoadDataViewProps, LoadDat
                   </>
                 ),
               },
-              {
-                name: 'context.taskLockType',
-                type: 'boolean',
-                label: `Allow concurrent ${
-                  appendToExisting ? 'append' : 'replace'
-                } tasks (experimental)`,
-                defaultValue: undefined,
-                valueAdjustment: v => {
-                  if (!v) return undefined;
-
-                  if (isStreamingSpec(spec)) {
-                    return 'APPEND';
-                  } else {
-                    return appendToExisting ? 'APPEND' : 'REPLACE';
-                  }
-                },
-                adjustValue: v => {
-                  if (v === undefined) return false;
-
-                  if (isStreamingSpec(spec)) {
-                    return v === 'APPEND';
-                  }
-
-                  return v === (appendToExisting ? 'APPEND' : 'REPLACE');
-                },
-                info: <p>Allows or forbids concurrent tasks.</p>,
-              },
             ]}
             model={spec}
             onChange={this.updateSpec}
           />
+          <Switch
+            label="Allow concurrent tasks (experimental)"
+            checked={typeof deepGet(spec, 'context.taskLockType') === 'string'}
+            onChange={() => {
+              this.updateSpec(
+                typeof deepGet(spec, 'context.taskLockType') === 'string'
+                  ? deepDelete(spec, 'context.taskLockType')
+                  : deepSet(
+                      spec,
+                      'context.taskLockType',
+                      isStreamingSpec(spec) || deepGet(spec, 
'spec.ioConfig.appendToExisting')
+                        ? 'APPEND'
+                        : 'REPLACE',
+                    ),
+              );
+            }}
+          />
         </div>
         <div className="other">
           <H5>Parse error reporting</H5>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to