This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 61edb7af8e9 MergeRollupTask config behaviour fix for
`output.segment.dir.uri` (#17015)
61edb7af8e9 is described below
commit 61edb7af8e9304df3f1123570b85ede651f66e3d
Author: Tommaso Peresson <[email protected]>
AuthorDate: Wed Nov 19 18:19:28 2025 +0000
MergeRollupTask config behaviour fix for `output.segment.dir.uri` (#17015)
---
.../pinot/plugin/minion/tasks/MinionTaskUtils.java | 60 +++++++++-----
.../plugin/minion/tasks/MinionTaskUtilsTest.java | 91 ++++++++++++++++++++++
2 files changed, 131 insertions(+), 20 deletions(-)
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index b3762355700..9d4379b63e4 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -104,40 +104,60 @@ public class MinionTaskUtils {
return PinotFSFactory.create(fileURIScheme);
}
+ public static URI getOutputSegmentDirURI(Map<String, String> taskConfigs,
ClusterInfoAccessor clusterInfoAccessor,
+ String tableName) {
+ // taskConfigs has priority over clusterInfo configs for
output.segment.dir.uri
+ String outputDir =
taskConfigs.getOrDefault(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI,
+ normalizeDirectoryURI(clusterInfoAccessor.getDataDir()) +
TableNameBuilder.extractRawTableName(tableName));
+ return URI.create(outputDir);
+ }
+
public static Map<String, String> getPushTaskConfig(String tableName,
Map<String, String> taskConfigs,
ClusterInfoAccessor clusterInfoAccessor) {
+ Map<String, String> singleFileGenerationTaskConfig = new
HashMap<>(taskConfigs);
try {
String pushMode = IngestionConfigUtils.getPushMode(taskConfigs);
- Map<String, String> singleFileGenerationTaskConfig = new
HashMap<>(taskConfigs);
- if (pushMode == null || pushMode.toUpperCase()
-
.contentEquals(BatchConfigProperties.SegmentPushType.TAR.toString())) {
- singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
- BatchConfigProperties.SegmentPushType.TAR.toString());
+ // Default value for Segment Push Type is TAR.
+ BatchConfigProperties.SegmentPushType segmentPushType;
+ if (pushMode == null) {
+ segmentPushType = BatchConfigProperties.SegmentPushType.TAR;
} else {
- URI outputDirURI = URI.create(
- normalizeDirectoryURI(clusterInfoAccessor.getDataDir()) +
TableNameBuilder.extractRawTableName(tableName));
- String outputDirURIScheme = outputDirURI.getScheme();
+ segmentPushType =
BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase());
+ }
- if (!isLocalOutputDir(outputDirURIScheme)) {
-
singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI,
outputDirURI.toString());
- if
(pushMode.toUpperCase().contentEquals(BatchConfigProperties.SegmentPushType.URI.toString()))
{
+ URI outputSegmentDirURI = getOutputSegmentDirURI(taskConfigs,
clusterInfoAccessor, tableName);
+ if (!isLocalOutputDir(outputSegmentDirURI.getScheme())) {
+ switch (segmentPushType) {
+ case URI:
+
singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI,
+ outputSegmentDirURI.toString());
LOGGER.warn("URI push type is not supported in this task.
Switching to METADATA push");
- pushMode =
BatchConfigProperties.SegmentPushType.METADATA.toString();
- }
- singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
pushMode);
- } else {
- LOGGER.warn("segment upload with METADATA push is not supported with
local output dir: {}."
- + " Switching to TAR push.", outputDirURI);
- singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
- BatchConfigProperties.SegmentPushType.TAR.toString());
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+ BatchConfigProperties.SegmentPushType.METADATA.toString());
+ break;
+ case METADATA:
+
singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI,
+ outputSegmentDirURI.toString());
+ break;
+ default:
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+ BatchConfigProperties.SegmentPushType.TAR.toString());
+ break;
}
+ } else {
+ LOGGER.warn("Local output dir found, defaulting to TAR: {}.",
outputSegmentDirURI);
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+ BatchConfigProperties.SegmentPushType.TAR.toString());
}
+
singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_CONTROLLER_URI,
clusterInfoAccessor.getVipUrlForLeadController(tableName));
return singleFileGenerationTaskConfig;
} catch (Exception e) {
- return taskConfigs;
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+ BatchConfigProperties.SegmentPushType.TAR.toString());
+ return singleFileGenerationTaskConfig;
}
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
index a5ff73f8063..1e804d7e6c2 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -31,10 +32,14 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.utils.Enablement;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -43,6 +48,16 @@ import static org.testng.Assert.expectThrows;
public class MinionTaskUtilsTest {
+ TableConfig _tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("dateTime").build();
+
+ public ClusterInfoAccessor getMockClusterInfo(String dataDir, String vipUrl)
{
+ ClusterInfoAccessor mockClusterInfo = mock(ClusterInfoAccessor.class);
+ when(mockClusterInfo.getDataDir()).thenReturn(dataDir);
+ when(mockClusterInfo.getVipUrlForLeadController(any())).thenReturn(vipUrl);
+ return mockClusterInfo;
+ }
+
@Test
public void testGetInputPinotFS()
throws Exception {
@@ -259,4 +274,80 @@ public class MinionTaskUtilsTest {
assertEquals(exception4.getMessage(),
"'snapshot' must not be 'DISABLE' with validDocIdsType:
SNAPSHOT_WITH_DELETE");
}
+
+ @Test
+ public void testGetPushTaskConfigNoConfig() {
+ Map<String, String> taskConfig = new HashMap<>();
+ Map<String, String> pushTaskConfigs =
MinionTaskUtils.getPushTaskConfig(_tableConfig.getTableName(), taskConfig,
+ getMockClusterInfo("/data/dir", "http://localhost:9000"));
+ assertEquals(pushTaskConfigs.size(), 2);
+ assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_MODE),
+ BatchConfigProperties.SegmentPushType.TAR.toString());
+ }
+
+ @Test
+ public void testGetPushTaskConfigURIPushMode() {
+ Map<String, String> taskConfig = new HashMap<>();
+ taskConfig.put(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.URI.toString());
+ Map<String, String> pushTaskConfigs =
MinionTaskUtils.getPushTaskConfig(_tableConfig.getTableName(), taskConfig,
+ getMockClusterInfo("/data/dir", "http://localhost:9000"));
+ assertEquals(pushTaskConfigs.size(), 2);
+ assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_MODE),
+ BatchConfigProperties.SegmentPushType.TAR.toString());
+
assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI),
"http://localhost:9000");
+ }
+
+ @Test
+ public void testGetPushTaskConfigURIPushModeDeepStoreControllerInfo() {
+ Map<String, String> taskConfig = new HashMap<>();
+ taskConfig.put(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.URI.toString());
+ Map<String, String> pushTaskConfigs =
MinionTaskUtils.getPushTaskConfig(_tableConfig.getTableName(), taskConfig,
+ getMockClusterInfo("hdfs://data/dir", "http://localhost:9000"));
+ assertEquals(pushTaskConfigs.size(), 3);
+ assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_MODE),
+ BatchConfigProperties.SegmentPushType.METADATA.toString());
+
assertEquals(pushTaskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI),
"hdfs://data/dir/myTable");
+
assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI),
"http://localhost:9000");
+ }
+
+ @Test
+ public void testGetPushTaskConfigMETADATAPushModeDeepStoreControllerInfo() {
+ Map<String, String> taskConfig = new HashMap<>();
+ taskConfig.put(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.METADATA.toString());
+ ClusterInfoAccessor mockClusterInfo =
getMockClusterInfo("hdfs://data/dir", "http://localhost:9000");
+ Map<String, String> pushTaskConfigs =
+ MinionTaskUtils.getPushTaskConfig(_tableConfig.getTableName(),
taskConfig, mockClusterInfo);
+
assertEquals(pushTaskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI),
"hdfs://data/dir/myTable");
+ assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_MODE),
+ BatchConfigProperties.SegmentPushType.METADATA.toString());
+ assertEquals(pushTaskConfigs.size(), 3);
+ }
+
+ @Test
+ public void
testGetPushTaskConfigMETADATAPushModeDeepStoreOutputUriTaskConfig() {
+ Map<String, String> taskConfig = new HashMap<>();
+ taskConfig.put(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.METADATA.toString());
+ taskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI,
"hdfs://data/dir/myTable");
+ Map<String, String> pushTaskConfigs =
MinionTaskUtils.getPushTaskConfig(_tableConfig.getTableName(), taskConfig,
+ getMockClusterInfo("/data/dir", "http://localhost:9000"));
+
+
assertEquals(pushTaskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI),
"hdfs://data/dir/myTable");
+ assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_MODE),
+ BatchConfigProperties.SegmentPushType.METADATA.toString());
+
assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI),
"http://localhost:9000");
+ assertEquals(pushTaskConfigs.size(), 3);
+ }
+
+ @Test
+ public void testGetPushTaskConfigTARPushMode() {
+ Map<String, String> taskConfig = new HashMap<>();
+ taskConfig.put(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.TAR.toString());
+ Map<String, String> pushTaskConfigs =
MinionTaskUtils.getPushTaskConfig(_tableConfig.getTableName(), taskConfig,
+ getMockClusterInfo("/data/dir", "http://localhost:9000"));
+
+ assertEquals(pushTaskConfigs.size(), 2);
+ assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_MODE),
+ BatchConfigProperties.SegmentPushType.TAR.toString());
+
assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI),
"http://localhost:9000");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]