This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 61edb7af8e9 MergeRollupTask config behaviour fix for 
`output.segment.dir.uri` (#17015)
61edb7af8e9 is described below

commit 61edb7af8e9304df3f1123570b85ede651f66e3d
Author: Tommaso Peresson <[email protected]>
AuthorDate: Wed Nov 19 18:19:28 2025 +0000

    MergeRollupTask config behaviour fix for `output.segment.dir.uri` (#17015)
---
 .../pinot/plugin/minion/tasks/MinionTaskUtils.java | 60 +++++++++-----
 .../plugin/minion/tasks/MinionTaskUtilsTest.java   | 91 ++++++++++++++++++++++
 2 files changed, 131 insertions(+), 20 deletions(-)

diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index b3762355700..9d4379b63e4 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -104,40 +104,60 @@ public class MinionTaskUtils {
     return PinotFSFactory.create(fileURIScheme);
   }
 
+  public static URI getOutputSegmentDirURI(Map<String, String> taskConfigs, 
ClusterInfoAccessor clusterInfoAccessor,
+      String tableName) {
+    // taskConfigs has priority over clusterInfo configs for 
output.segment.dir.uri
+    String outputDir = 
taskConfigs.getOrDefault(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI,
+        normalizeDirectoryURI(clusterInfoAccessor.getDataDir()) + 
TableNameBuilder.extractRawTableName(tableName));
+    return URI.create(outputDir);
+  }
+
   public static Map<String, String> getPushTaskConfig(String tableName, 
Map<String, String> taskConfigs,
       ClusterInfoAccessor clusterInfoAccessor) {
+    Map<String, String> singleFileGenerationTaskConfig = new 
HashMap<>(taskConfigs);
     try {
       String pushMode = IngestionConfigUtils.getPushMode(taskConfigs);
 
-      Map<String, String> singleFileGenerationTaskConfig = new 
HashMap<>(taskConfigs);
-      if (pushMode == null || pushMode.toUpperCase()
-          
.contentEquals(BatchConfigProperties.SegmentPushType.TAR.toString())) {
-        singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
-            BatchConfigProperties.SegmentPushType.TAR.toString());
+      // Default value for Segment Push Type is TAR.
+      BatchConfigProperties.SegmentPushType segmentPushType;
+      if (pushMode == null) {
+        segmentPushType = BatchConfigProperties.SegmentPushType.TAR;
       } else {
-        URI outputDirURI = URI.create(
-            normalizeDirectoryURI(clusterInfoAccessor.getDataDir()) + 
TableNameBuilder.extractRawTableName(tableName));
-        String outputDirURIScheme = outputDirURI.getScheme();
+        segmentPushType = 
BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase());
+      }
 
-        if (!isLocalOutputDir(outputDirURIScheme)) {
-          
singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI,
 outputDirURI.toString());
-          if 
(pushMode.toUpperCase().contentEquals(BatchConfigProperties.SegmentPushType.URI.toString()))
 {
+      URI outputSegmentDirURI = getOutputSegmentDirURI(taskConfigs, 
clusterInfoAccessor, tableName);
+      if (!isLocalOutputDir(outputSegmentDirURI.getScheme())) {
+        switch (segmentPushType) {
+          case URI:
+            
singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI,
+                outputSegmentDirURI.toString());
             LOGGER.warn("URI push type is not supported in this task. 
Switching to METADATA push");
-            pushMode = 
BatchConfigProperties.SegmentPushType.METADATA.toString();
-          }
-          singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, 
pushMode);
-        } else {
-          LOGGER.warn("segment upload with METADATA push is not supported with 
local output dir: {}."
-              + " Switching to TAR push.", outputDirURI);
-          singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
-              BatchConfigProperties.SegmentPushType.TAR.toString());
+            singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+                BatchConfigProperties.SegmentPushType.METADATA.toString());
+            break;
+          case METADATA:
+            
singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI,
+                outputSegmentDirURI.toString());
+            break;
+          default:
+            singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+                BatchConfigProperties.SegmentPushType.TAR.toString());
+            break;
         }
+      } else {
+        LOGGER.warn("Local output dir found, defaulting to TAR: {}.", 
outputSegmentDirURI);
+        singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+            BatchConfigProperties.SegmentPushType.TAR.toString());
       }
+
       
singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_CONTROLLER_URI,
           clusterInfoAccessor.getVipUrlForLeadController(tableName));
       return singleFileGenerationTaskConfig;
     } catch (Exception e) {
-      return taskConfigs;
+      singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+            BatchConfigProperties.SegmentPushType.TAR.toString());
+      return singleFileGenerationTaskConfig;
     }
   }
 
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
index a5ff73f8063..1e804d7e6c2 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -31,10 +32,14 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.filesystem.LocalPinotFS;
 import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.utils.Enablement;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
@@ -43,6 +48,16 @@ import static org.testng.Assert.expectThrows;
 
 public class MinionTaskUtilsTest {
 
+  TableConfig _tableConfig =
+      new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("dateTime").build();
+
+  public ClusterInfoAccessor getMockClusterInfo(String dataDir, String vipUrl) 
{
+    ClusterInfoAccessor mockClusterInfo = mock(ClusterInfoAccessor.class);
+    when(mockClusterInfo.getDataDir()).thenReturn(dataDir);
+    when(mockClusterInfo.getVipUrlForLeadController(any())).thenReturn(vipUrl);
+    return mockClusterInfo;
+  }
+
   @Test
   public void testGetInputPinotFS()
       throws Exception {
@@ -259,4 +274,80 @@ public class MinionTaskUtilsTest {
     assertEquals(exception4.getMessage(),
         "'snapshot' must not be 'DISABLE' with validDocIdsType: 
SNAPSHOT_WITH_DELETE");
   }
+
+  @Test
+  public void testGetPushTaskConfigNoConfig() {
+    Map<String, String> taskConfig = new HashMap<>();
+    Map<String, String> pushTaskConfigs = 
MinionTaskUtils.getPushTaskConfig(_tableConfig.getTableName(), taskConfig,
+        getMockClusterInfo("/data/dir", "http://localhost:9000";));
+    assertEquals(pushTaskConfigs.size(), 2);
+    assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_MODE),
+        BatchConfigProperties.SegmentPushType.TAR.toString());
+  }
+
+  @Test
+  public void testGetPushTaskConfigURIPushMode() {
+    Map<String, String> taskConfig = new HashMap<>();
+    taskConfig.put(BatchConfigProperties.PUSH_MODE, 
BatchConfigProperties.SegmentPushType.URI.toString());
+    Map<String, String> pushTaskConfigs = 
MinionTaskUtils.getPushTaskConfig(_tableConfig.getTableName(), taskConfig,
+        getMockClusterInfo("/data/dir", "http://localhost:9000";));
+    assertEquals(pushTaskConfigs.size(), 2);
+    assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_MODE),
+        BatchConfigProperties.SegmentPushType.TAR.toString());
+    
assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI), 
"http://localhost:9000";);
+  }
+
+  @Test
+  public void testGetPushTaskConfigURIPushModeDeepStoreControllerInfo() {
+    Map<String, String> taskConfig = new HashMap<>();
+    taskConfig.put(BatchConfigProperties.PUSH_MODE, 
BatchConfigProperties.SegmentPushType.URI.toString());
+    Map<String, String> pushTaskConfigs = 
MinionTaskUtils.getPushTaskConfig(_tableConfig.getTableName(), taskConfig,
+        getMockClusterInfo("hdfs://data/dir", "http://localhost:9000";));
+    assertEquals(pushTaskConfigs.size(), 3);
+    assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_MODE),
+        BatchConfigProperties.SegmentPushType.METADATA.toString());
+    
assertEquals(pushTaskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI), 
"hdfs://data/dir/myTable");
+    
assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI), 
"http://localhost:9000";);
+  }
+
+  @Test
+  public void testGetPushTaskConfigMETADATAPushModeDeepStoreControllerInfo() {
+    Map<String, String> taskConfig = new HashMap<>();
+    taskConfig.put(BatchConfigProperties.PUSH_MODE, 
BatchConfigProperties.SegmentPushType.METADATA.toString());
+    ClusterInfoAccessor mockClusterInfo = 
getMockClusterInfo("hdfs://data/dir", "http://localhost:9000";);
+    Map<String, String> pushTaskConfigs =
+        MinionTaskUtils.getPushTaskConfig(_tableConfig.getTableName(), 
taskConfig, mockClusterInfo);
+    
assertEquals(pushTaskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI), 
"hdfs://data/dir/myTable");
+    assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_MODE),
+        BatchConfigProperties.SegmentPushType.METADATA.toString());
+    assertEquals(pushTaskConfigs.size(), 3);
+  }
+
+  @Test
+  public void 
testGetPushTaskConfigMETADATAPushModeDeepStoreOutputUriTaskConfig() {
+    Map<String, String> taskConfig = new HashMap<>();
+    taskConfig.put(BatchConfigProperties.PUSH_MODE, 
BatchConfigProperties.SegmentPushType.METADATA.toString());
+    taskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, 
"hdfs://data/dir/myTable");
+    Map<String, String> pushTaskConfigs = 
MinionTaskUtils.getPushTaskConfig(_tableConfig.getTableName(), taskConfig,
+        getMockClusterInfo("/data/dir", "http://localhost:9000";));
+
+    
assertEquals(pushTaskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI), 
"hdfs://data/dir/myTable");
+    assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_MODE),
+        BatchConfigProperties.SegmentPushType.METADATA.toString());
+    
assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI), 
"http://localhost:9000";);
+    assertEquals(pushTaskConfigs.size(), 3);
+  }
+
+  @Test
+  public void testGetPushTaskConfigTARPushMode() {
+    Map<String, String> taskConfig = new HashMap<>();
+    taskConfig.put(BatchConfigProperties.PUSH_MODE, 
BatchConfigProperties.SegmentPushType.TAR.toString());
+    Map<String, String> pushTaskConfigs = 
MinionTaskUtils.getPushTaskConfig(_tableConfig.getTableName(), taskConfig,
+        getMockClusterInfo("/data/dir", "http://localhost:9000";));
+
+    assertEquals(pushTaskConfigs.size(), 2);
+    assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_MODE),
+        BatchConfigProperties.SegmentPushType.TAR.toString());
+    
assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI), 
"http://localhost:9000";);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to