This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 657e7f075f7 Support metadata push mode in
BaseSingleSegmentConversionExecutor (#17632)
657e7f075f7 is described below
commit 657e7f075f7a5d41c45565f4bc5247ca3b4f7293
Author: Shounak kulkarni <[email protected]>
AuthorDate: Mon Feb 16 11:15:18 2026 +0530
Support metadata push mode in BaseSingleSegmentConversionExecutor (#17632)
* Support metadata push mode in BaseSingleSegmentConversionExecutor
* remove URI support
* bypass local fs specific logic for metadata push mode testing
* inject relevant push configs for tasks extending ssce
* extend existing tests to validate metadata push mode
* cleanup
Note: Please note that there unrelated test failures. We should take it
separately to address flaky tests.
KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.setUp:292->BaseRealtimeClusterIntegrationTest.setUp:67->startKafka:88->startSchemaRegistry:99
ยป IllegalState Could not find a valid Docker environment. Please see logs and
check configuration
---
...geMetadataPushMinionClusterIntegrationTest.java | 76 ++++++++++++
.../tests/PurgeMinionClusterIntegrationTest.java | 2 +-
...ntMetadataPushMinionClusterIntegrationTest.java | 71 ++++++++++++
...RefreshSegmentMinionClusterIntegrationTest.java | 2 +-
.../BaseMultipleSegmentsConversionExecutor.java | 115 ++----------------
.../tasks/BaseSingleSegmentConversionExecutor.java | 73 +++++++++---
.../plugin/minion/tasks/BaseTaskExecutor.java | 129 +++++++++++++++++++++
.../pinot/plugin/minion/tasks/MinionTaskUtils.java | 23 +++-
.../minion/tasks/purge/PurgeTaskGenerator.java | 2 +
.../RefreshSegmentTaskGenerator.java | 1 +
.../SegmentGenerationAndPushTaskExecutor.java | 15 +--
.../UpsertCompactionTaskGenerator.java | 1 +
.../plugin/minion/tasks/MinionTaskUtilsTest.java | 15 +++
13 files changed, 385 insertions(+), 140 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMetadataPushMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMetadataPushMinionClusterIntegrationTest.java
new file mode 100644
index 00000000000..c35cf6926d4
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMetadataPushMinionClusterIntegrationTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test that runs the Purge minion task with METADATA {@link
+ * org.apache.pinot.spi.ingestion.batch.BatchConfigProperties.SegmentPushType}
to verify the full flow.
+ * Only {@link #testFirstRunPurge()} is enabled; other tests from the base
class are disabled.
+ */
+public class PurgeMetadataPushMinionClusterIntegrationTest extends
PurgeMinionClusterIntegrationTest {
+
+ @Override
+ protected TableTaskConfig getPurgeTaskConfig() {
+ Map<String, String> tableTaskConfigs = new HashMap<>();
+
tableTaskConfigs.put(MinionConstants.PurgeTask.LAST_PURGE_TIME_THREESOLD_PERIOD,
"1d");
+ tableTaskConfigs.put(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.METADATA.name());
+ tableTaskConfigs.put(MinionTaskUtils.ALLOW_METADATA_PUSH_WITH_LOCAL_FS,
"true");
+ return new
TableTaskConfig(Collections.singletonMap(MinionConstants.PurgeTask.TASK_TYPE,
tableTaskConfigs));
+ }
+
+ @Override
+ @Test(enabled = false)
+ public void testPassedDelayTimePurge() {
+ // Disabled: only testFirstRunPurge runs for METADATA push flow.
+ }
+
+ @Override
+ @Test(enabled = false)
+ public void testNotPassedDelayTimePurge() {
+ // Disabled: only testFirstRunPurge runs for METADATA push flow.
+ }
+
+ @Override
+ @Test(enabled = false)
+ public void testPurgeOnOldSegmentsWithIndicesOnNewColumns() {
+ // Disabled: only testFirstRunPurge runs for METADATA push flow.
+ }
+
+ @Override
+ @Test(enabled = false)
+ public void testSegmentDeletionWhenAllRecordsPurged() {
+ // Disabled: only testFirstRunPurge runs for METADATA push flow.
+ }
+
+ @Override
+ @Test(enabled = false)
+ public void testRealtimeLastSegmentPreservation() {
+ // Disabled: only testFirstRunPurge runs for METADATA push flow.
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index 9aab0fc5771..10d69b04ba2 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -190,7 +190,7 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
_tableName = tableName;
}
- private TableTaskConfig getPurgeTaskConfig() {
+ protected TableTaskConfig getPurgeTaskConfig() {
Map<String, String> tableTaskConfigs = new HashMap<>();
tableTaskConfigs.put(MinionConstants.PurgeTask.LAST_PURGE_TIME_THREESOLD_PERIOD,
"1d");
return new
TableTaskConfig(Collections.singletonMap(MinionConstants.PurgeTask.TASK_TYPE,
tableTaskConfigs));
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMetadataPushMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMetadataPushMinionClusterIntegrationTest.java
new file mode 100644
index 00000000000..a434a32194f
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMetadataPushMinionClusterIntegrationTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test that runs the Refresh Segment minion task with METADATA
{@link
+ * org.apache.pinot.spi.ingestion.batch.BatchConfigProperties.SegmentPushType}
to verify the full flow.
+ * Only {@link #testFirstSegmentRefresh()} is enabled; other tests from the
base class are disabled.
+ */
+public class RefreshSegmentMetadataPushMinionClusterIntegrationTest
+ extends RefreshSegmentMinionClusterIntegrationTest {
+
+ @Override
+ protected TableTaskConfig getRefreshSegmentTaskConfig() {
+ Map<String, String> tableTaskConfigs = new HashMap<>();
+ tableTaskConfigs.put(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.METADATA.name());
+ tableTaskConfigs.put(MinionTaskUtils.ALLOW_METADATA_PUSH_WITH_LOCAL_FS,
"true");
+ return new TableTaskConfig(
+ Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE,
tableTaskConfigs));
+ }
+
+ @Override
+ @Test(priority = 2, enabled = false)
+ public void testValidDatatypeChange() {
+ // Disabled: only testFirstSegmentRefresh runs for METADATA push flow.
+ }
+
+ @Override
+ @Test(priority = 3, enabled = false)
+ public void testIndexChanges() {
+ // Disabled: only testFirstSegmentRefresh runs for METADATA push flow.
+ }
+
+ @Override
+ @Test(priority = 4, enabled = false)
+ public void checkColumnAddition() {
+ // Disabled: only testFirstSegmentRefresh runs for METADATA push flow.
+ }
+
+ @Override
+ @Test(priority = 5, enabled = false)
+ public void checkRefreshNotNecessary() {
+ // Disabled: only testFirstSegmentRefresh runs for METADATA push flow.
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
index c0921838682..335a176c02f 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
@@ -437,7 +437,7 @@ public class RefreshSegmentMinionClusterIntegrationTest
extends BaseClusterInteg
}, 60_000L, "Failed to meet condition");
}
- private TableTaskConfig getRefreshSegmentTaskConfig() {
+ protected TableTaskConfig getRefreshSegmentTaskConfig() {
Map<String, String> tableTaskConfigs = new HashMap<>();
return new TableTaskConfig(
Collections.singletonMap(MinionConstants.RefreshSegmentTask.TASK_TYPE,
tableTaskConfigs));
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 22351040b04..2ee0d1656c0 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -41,10 +41,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.NameValuePair;
-import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.message.BasicNameValuePair;
-import org.apache.pinot.common.auth.AuthProviderUtils;
-import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.metrics.MinionMeter;
import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
@@ -62,10 +59,8 @@ import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
-import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
-import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,10 +80,6 @@ public abstract class BaseMultipleSegmentsConversionExecutor
extends BaseTaskExe
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
private static final String CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID =
"lineageEntryId";
- private static final int DEFAULT_PUSH_ATTEMPTS = 5;
- private static final int DEFAULT_PUSH_PARALLELISM = 1;
- private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L;
-
protected MinionConf _minionConf;
// Tracking finer grained progress status.
@@ -442,59 +433,12 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
}
}
- @VisibleForTesting
- PushJobSpec getPushJobSpec(Map<String, String> taskConfigs) {
- PushJobSpec pushJobSpec = new PushJobSpec();
- pushJobSpec.setPushAttempts(DEFAULT_PUSH_ATTEMPTS);
- pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
- pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
-
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
-
pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
- boolean batchSegmentUpload = Boolean.parseBoolean(taskConfigs.getOrDefault(
- BatchConfigProperties.BATCH_SEGMENT_UPLOAD, "false"));
- if (batchSegmentUpload) {
- pushJobSpec.setBatchSegmentUpload(true);
- }
- return pushJobSpec;
- }
-
@VisibleForTesting
List<Header> getSegmentPushCommonHeaders(PinotTaskConfig pinotTaskConfig,
AuthProvider authProvider,
List<SegmentConversionResult> segmentConversionResults) {
- SegmentConversionResult segmentConversionResult;
- if (segmentConversionResults.size() == 1) {
- segmentConversionResult = segmentConversionResults.get(0);
- } else {
- // Setting to null as the base method expects a single object. This is
ok for now, since the
- // segmentConversionResult is not made use of while generating the
customMap.
- segmentConversionResult = null;
- }
- SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
- getSegmentZKMetadataCustomMapModifier(pinotTaskConfig,
segmentConversionResult);
- Header segmentZKMetadataCustomMapModifierHeader =
- new
BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
- segmentZKMetadataCustomMapModifier.toJsonString());
-
- List<Header> headers = new ArrayList<>();
- headers.add(segmentZKMetadataCustomMapModifierHeader);
- headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
- return headers;
- }
-
- @VisibleForTesting
- List<NameValuePair> getSegmentPushCommonParams(String tableNameWithType) {
- List<NameValuePair> params = new ArrayList<>();
- params.add(new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION,
- "true"));
- params.add(new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
- TableNameBuilder.extractRawTableName(tableNameWithType)));
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType != null) {
- params.add(new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
tableType.toString()));
- } else {
- throw new RuntimeException("Failed to determine the tableType from name:
" + tableNameWithType);
- }
- return params;
+ SegmentConversionResult segmentConversionResult =
+ segmentConversionResults.size() == 1 ? segmentConversionResults.get(0)
: null;
+ return getSegmentPushMetadataHeaders(pinotTaskConfig, authProvider,
segmentConversionResult);
}
private void pushSegments(String tableNameWithType, Map<String, String>
taskConfigs, PinotTaskConfig pinotTaskConfig,
@@ -516,20 +460,13 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
private void pushSegment(String tableName, Map<String, String> taskConfigs,
URI outputSegmentTarURI,
List<Header> headers, List<NameValuePair> parameters,
SegmentConversionResult segmentConversionResult)
throws Exception {
- String pushMode =
- taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.TAR.name());
- LOGGER.info("Trying to push Pinot segment with push mode {} from {}",
pushMode, outputSegmentTarURI);
-
- PushJobSpec pushJobSpec = new PushJobSpec();
- pushJobSpec.setPushAttempts(DEFAULT_PUSH_ATTEMPTS);
- pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
- pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
-
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
-
pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+ BatchConfigProperties.SegmentPushType pushType =
getSegmentPushType(taskConfigs);
+ LOGGER.info("Trying to push Pinot segment with push mode {} from {}",
pushType, outputSegmentTarURI);
+ PushJobSpec pushJobSpec = getPushJobSpec(taskConfigs);
SegmentGenerationJobSpec spec =
generateSegmentGenerationJobSpec(tableName, taskConfigs, pushJobSpec);
- switch
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
+ switch (pushType) {
case TAR:
File tarFile = new File(outputSegmentTarURI);
String segmentName = segmentConversionResult.getSegmentName();
@@ -552,43 +489,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
}
break;
default:
- throw new UnsupportedOperationException("Unrecognized push mode - " +
pushMode);
- }
- }
-
- private SegmentGenerationJobSpec generateSegmentGenerationJobSpec(String
tableName, Map<String, String> taskConfigs,
- PushJobSpec pushJobSpec) {
-
- TableSpec tableSpec = new TableSpec();
- tableSpec.setTableName(tableName);
-
- PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
-
pinotClusterSpec.setControllerURI(taskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
- PinotClusterSpec[] pinotClusterSpecs = new
PinotClusterSpec[]{pinotClusterSpec};
-
- SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
- spec.setPushJobSpec(pushJobSpec);
- spec.setTableSpec(tableSpec);
- spec.setPinotClusterSpecs(pinotClusterSpecs);
- spec.setAuthToken(MinionTaskUtils.resolveAuthToken(taskConfigs));
-
- return spec;
- }
-
- private URI moveSegmentToOutputPinotFS(Map<String, String> taskConfigs, File
localSegmentTarFile)
- throws Exception {
- URI outputSegmentDirURI =
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
- try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs,
outputSegmentDirURI)) {
- URI outputSegmentTarURI =
-
URI.create(MinionTaskUtils.normalizeDirectoryURI(outputSegmentDirURI) +
localSegmentTarFile.getName());
- if
(!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT))
&& outputFileFS.exists(
- outputSegmentTarURI)) {
- throw new RuntimeException("Output file: " + outputSegmentTarURI + "
already exists. Set 'overwriteOutput' to "
- + "true to ignore this error");
- } else {
- outputFileFS.copyFromLocalFile(localSegmentTarFile,
outputSegmentTarURI);
- }
- return outputSegmentTarURI;
+ throw new UnsupportedOperationException("Unrecognized push mode - " +
pushType);
}
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index 965a00b3bb3..d3a0494dce7 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -21,8 +21,8 @@ package org.apache.pinot.plugin.minion.tasks;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
+import java.net.URI;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -31,7 +31,6 @@ import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.message.BasicHeader;
-import org.apache.hc.core5.http.message.BasicNameValuePair;
import org.apache.pinot.common.auth.AuthProviderUtils;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.metrics.MinionMeter;
@@ -43,8 +42,13 @@ import org.apache.pinot.minion.event.MinionEventObserver;
import org.apache.pinot.minion.event.MinionEventObservers;
import org.apache.pinot.minion.exception.TaskCancelledException;
import org.apache.pinot.plugin.minion.tasks.purge.PurgeTaskExecutor;
+import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -184,22 +188,27 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
httpHeaders.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
- // Set parameters for upload request.
- NameValuePair enableParallelPushProtectionParameter =
- new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION,
"true");
- NameValuePair tableNameParameter = new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
- TableNameBuilder.extractRawTableName(tableNameWithType));
- NameValuePair tableTypeParameter = new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
-
TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString());
- List<NameValuePair> parameters =
- Arrays.asList(enableParallelPushProtectionParameter,
tableNameParameter, tableTypeParameter);
-
- // Upload the tarred segment
- _eventObserver.notifyProgress(_pinotTaskConfig, "Uploading segment: " +
segmentName);
+ // Set parameters for upload request (shared with metadata push).
+ List<NameValuePair> parameters =
getSegmentPushCommonParams(tableNameWithType);
+
+ // Upload the tarred segment using the configured push mode (TAR or
METADATA)
+ BatchConfigProperties.SegmentPushType pushType =
getSegmentPushType(configs);
+ _eventObserver.notifyProgress(_pinotTaskConfig, "Uploading segment: " +
segmentName + " (push mode: " + pushType
+ + ")");
boolean uploadSuccessful = true;
try {
- SegmentConversionUtils.uploadSegment(configs, httpHeaders, parameters,
tableNameWithType, segmentName,
- uploadURL, convertedTarredSegmentFile);
+ switch (pushType) {
+ case TAR:
+ SegmentConversionUtils.uploadSegment(configs, httpHeaders,
parameters, tableNameWithType, segmentName,
+ uploadURL, convertedTarredSegmentFile);
+ break;
+ case METADATA:
+ uploadSegmentWithMetadata(configs, pinotTaskConfig,
segmentConversionResult, authProvider, parameters,
+ tableNameWithType, convertedTarredSegmentFile);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unrecognized push mode: "
+ pushType);
+ }
} catch (Exception e) {
uploadSuccessful = false;
_minionMetrics.addMeteredTableValue(tableNameWithType,
MinionMeter.SEGMENT_UPLOAD_FAIL_COUNT, 1L);
@@ -220,6 +229,38 @@ public abstract class BaseSingleSegmentConversionExecutor
extends BaseTaskExecut
}
}
+ /**
+ * Pushes the segment in METADATA (or URI) mode: copies the tarred segment
to the output PinotFS and sends segment
+ * URI and metadata to the controller. Requires {@link
BatchConfigProperties#OUTPUT_SEGMENT_DIR_URI} and
+ * {@link BatchConfigProperties#PUSH_CONTROLLER_URI} in configs.
+ */
+ private void uploadSegmentWithMetadata(Map<String, String> configs,
PinotTaskConfig pinotTaskConfig,
+ SegmentConversionResult segmentConversionResult, AuthProvider
authProvider, List<NameValuePair> parameters,
+ String tableNameWithType, File convertedTarredSegmentFile)
+ throws Exception {
+ if (!configs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
+ throw new RuntimeException("Output dir URI missing for metadata push.
Set "
+ + BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI + " in task config.");
+ }
+ URI outputSegmentDirURI =
URI.create(configs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+ URI outputSegmentTarURI = moveSegmentToOutputPinotFS(configs,
convertedTarredSegmentFile);
+ LOGGER.info("Moved generated segment from [{}] to location: [{}]",
convertedTarredSegmentFile, outputSegmentTarURI);
+
+ PushJobSpec pushJobSpec = getPushJobSpec(configs);
+ SegmentGenerationJobSpec spec = generateSegmentGenerationJobSpec(
+ TableNameBuilder.extractRawTableName(tableNameWithType), configs,
pushJobSpec);
+
+ List<Header> metadataHeaders =
getSegmentPushMetadataHeaders(pinotTaskConfig, authProvider,
+ segmentConversionResult);
+
+ try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(configs,
outputSegmentDirURI)) {
+ Map<String, String> segmentUriToTarPathMap =
SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI,
+ pushJobSpec, new String[]{outputSegmentTarURI.toString()});
+ SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS,
segmentUriToTarPathMap, metadataHeaders,
+ parameters);
+ }
+ }
+
// For tests only.
@VisibleForTesting
public void setMinionEventObserver(MinionEventObserver observer) {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
index 061c43ea4cc..8dbc8ed3b8a 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseTaskExecutor.java
@@ -21,16 +21,24 @@ package org.apache.pinot.plugin.minion.tasks;
import com.google.common.base.Preconditions;
import java.io.File;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.metrics.MinionMeter;
import org.apache.pinot.common.metrics.MinionMetrics;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
@@ -38,7 +46,15 @@ import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +62,9 @@ import org.slf4j.LoggerFactory;
public abstract class BaseTaskExecutor implements PinotTaskExecutor {
protected static final Logger LOGGER =
LoggerFactory.getLogger(BaseTaskExecutor.class);
protected static final MinionContext MINION_CONTEXT =
MinionContext.getInstance();
+ protected static final int SEGMENT_PUSH_DEFAULT_ATTEMPTS = 5;
+ protected static final int SEGMENT_PUSH_DEFAULT_PARALLELISM = 1;
+ protected static final long SEGMENT_PUSH_DEFAULT_RETRY_INTERVAL_MILLIS =
1000L;
protected boolean _cancelled = false;
protected final MinionMetrics _minionMetrics = MinionMetrics.get();
@@ -161,4 +180,114 @@ public abstract class BaseTaskExecutor implements
PinotTaskExecutor {
}
return indexDir;
}
+
+ /**
+ * Builds a {@link PushJobSpec} from task configs. Used for both TAR and
METADATA push modes.
+ */
+ protected PushJobSpec getPushJobSpec(Map<String, String> configs) {
+ PushJobSpec pushJobSpec = new PushJobSpec();
+ pushJobSpec.setPushAttempts(SEGMENT_PUSH_DEFAULT_ATTEMPTS);
+ pushJobSpec.setPushParallelism(SEGMENT_PUSH_DEFAULT_PARALLELISM);
+
pushJobSpec.setPushRetryIntervalMillis(SEGMENT_PUSH_DEFAULT_RETRY_INTERVAL_MILLIS);
+
pushJobSpec.setSegmentUriPrefix(configs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
+
pushJobSpec.setSegmentUriSuffix(configs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));
+ boolean batchSegmentUpload = Boolean.parseBoolean(configs.getOrDefault(
+ BatchConfigProperties.BATCH_SEGMENT_UPLOAD, "false"));
+ if (batchSegmentUpload) {
+ pushJobSpec.setBatchSegmentUpload(true);
+ }
+ return pushJobSpec;
+ }
+
+ /**
+ * Builds a {@link SegmentGenerationJobSpec} for segment push (TAR or
METADATA). Requires
+ * {@link BatchConfigProperties#PUSH_CONTROLLER_URI} in configs for METADATA
push.
+ */
+ protected SegmentGenerationJobSpec generateSegmentGenerationJobSpec(String
tableName, Map<String, String> configs,
+ PushJobSpec pushJobSpec) {
+ TableSpec tableSpec = new TableSpec();
+ tableSpec.setTableName(tableName);
+
+ PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
+
pinotClusterSpec.setControllerURI(configs.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
+ SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
+ spec.setPushJobSpec(pushJobSpec);
+ spec.setTableSpec(tableSpec);
+ spec.setPinotClusterSpecs(new PinotClusterSpec[]{pinotClusterSpec});
+ spec.setAuthToken(MinionTaskUtils.resolveAuthToken(configs));
+ return spec;
+ }
+
+ /**
+ * Copies the local segment tar file to the output PinotFS. Requires
+ * {@link BatchConfigProperties#OUTPUT_SEGMENT_DIR_URI} in configs.
+ *
+ * @return the URI of the segment tar on the output filesystem
+ */
+ protected URI moveSegmentToOutputPinotFS(Map<String, String> configs, File
localSegmentTarFile)
+ throws Exception {
+ URI outputSegmentDirURI =
URI.create(configs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+ try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(configs,
outputSegmentDirURI)) {
+ URI outputSegmentTarURI =
URI.create(MinionTaskUtils.normalizeDirectoryURI(outputSegmentDirURI)
+ + URIUtils.encode(localSegmentTarFile.getName()));
+ if
(!Boolean.parseBoolean(configs.get(BatchConfigProperties.OVERWRITE_OUTPUT))
+ && outputFileFS.exists(outputSegmentTarURI)) {
+ throw new RuntimeException("Output file: " + outputSegmentTarURI + "
already exists. Set 'overwriteOutput' to "
+ + "true to ignore this error");
+ }
+ outputFileFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+ return outputSegmentTarURI;
+ }
+ }
+
+ /**
+ * Returns HTTP parameters common to segment upload and metadata push
(parallel push protection, table name, type).
+ */
+ protected List<NameValuePair> getSegmentPushCommonParams(String
tableNameWithType) {
+ List<NameValuePair> params = new ArrayList<>();
+ params.add(new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION,
+ "true"));
+ params.add(new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
+ TableNameBuilder.extractRawTableName(tableNameWithType)));
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType != null) {
+ params.add(new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
tableType.toString()));
+ } else {
+ throw new RuntimeException("Failed to determine the tableType from name:
" + tableNameWithType);
+ }
+ return params;
+ }
+
+ /**
+ * Returns HTTP headers for segment metadata push (ZK metadata custom map
modifier + auth). Used when pushing
+ * metadata to the controller instead of uploading the tar via HTTP.
+ *
+ * @param segmentConversionResult the conversion result for the segment; may
be null when building headers for
+ * multiple segments where a single modifier
does not apply
+ */
+ protected List<Header> getSegmentPushMetadataHeaders(PinotTaskConfig
pinotTaskConfig, AuthProvider authProvider,
+ SegmentConversionResult segmentConversionResult) {
+ SegmentZKMetadataCustomMapModifier modifier =
+ getSegmentZKMetadataCustomMapModifier(pinotTaskConfig,
segmentConversionResult);
+ Header modifierHeader =
+ new
BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
+ modifier.toJsonString());
+ List<Header> headers = new ArrayList<>();
+ headers.add(modifierHeader);
+ headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
+ return headers;
+ }
+
+ /**
+ * Returns the segment push mode for upload. Default is TAR (HTTP upload).
Subclasses may override to use METADATA
+ * mode (move segment to output PinotFS and send metadata to controller)
when needed.
+ *
+ * @param configs task configs; may contain {@link
BatchConfigProperties#PUSH_MODE}
+ * @return push type (TAR or METADATA)
+ */
+ protected BatchConfigProperties.SegmentPushType
getSegmentPushType(Map<String, String> configs) {
+ String pushMode = configs.getOrDefault(BatchConfigProperties.PUSH_MODE,
+ BatchConfigProperties.SegmentPushType.TAR.name());
+ return
BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase());
+ }
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index 2da96335b75..ca197849210 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -72,6 +72,12 @@ public class MinionTaskUtils {
public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
public static final String UTC = "UTC";
+ /**
+ * When true, allows METADATA push mode with local FS output dir. Intended
for integration tests only.
+ * Production should leave this unset (defaults to false); local FS then
always uses TAR push.
+ */
+ public static final String ALLOW_METADATA_PUSH_WITH_LOCAL_FS =
"allowMetadataPushWithLocalFs";
+
private MinionTaskUtils() {
}
@@ -191,9 +197,20 @@ public class MinionTaskUtils {
break;
}
} else {
- LOGGER.warn("Local output dir found, defaulting to TAR: {}.",
outputSegmentDirURI);
- singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
- BatchConfigProperties.SegmentPushType.TAR.toString());
+ boolean allowMetadataPushWithLocalFs = Boolean.parseBoolean(
+ taskConfigs.getOrDefault(ALLOW_METADATA_PUSH_WITH_LOCAL_FS,
"false"));
+ if (allowMetadataPushWithLocalFs && pushMode != null) {
+ // Override for integration tests: respect explicit push mode with
local FS
+
singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI,
+ outputSegmentDirURI.toString());
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+ segmentPushType.toString());
+ } else {
+ // Production: default to TAR for local output dir
+ LOGGER.warn("Local output dir found, defaulting to TAR: {}.",
outputSegmentDirURI);
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE,
+ BatchConfigProperties.SegmentPushType.TAR.toString());
+ }
}
singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_CONTROLLER_URI,
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
index 298927b0c97..16506881cef 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
@@ -34,6 +34,7 @@ import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator
import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
@@ -122,6 +123,7 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
continue;
}
Map<String, String> configs = new
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
+ configs.putAll(MinionTaskUtils.getPushTaskConfig(tableName,
taskConfigs, _clusterInfoAccessor));
Long tsLastPurge;
if (segmentZKMetadata.getCustomMap() != null) {
tsLastPurge = Long.valueOf(segmentZKMetadata.getCustomMap()
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
index 81263798a42..5dc1ca2f062 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
@@ -127,6 +127,7 @@ public class RefreshSegmentTaskGenerator extends
BaseTaskGenerator {
}
Map<String, String> configs = new
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
+ configs.putAll(MinionTaskUtils.getPushTaskConfig(tableNameWithType,
taskConfigs, _clusterInfoAccessor));
configs.put(MinionConstants.DOWNLOAD_URL_KEY,
segmentZKMetadata.getDownloadUrl());
configs.put(MinionConstants.UPLOAD_URL_KEY,
_clusterInfoAccessor.getVipUrlForLeadController(tableNameWithType) +
"/segments");
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
index dc42ca8eda9..9c71c50f6ec 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java
@@ -236,22 +236,13 @@ public class SegmentGenerationAndPushTaskExecutor extends
BaseTaskExecutor {
return spec;
}
- private URI moveSegmentToOutputPinotFS(Map<String, String> taskConfigs, File
localSegmentTarFile)
+ @Override
+ protected URI moveSegmentToOutputPinotFS(Map<String, String> taskConfigs,
File localSegmentTarFile)
throws Exception {
if
(!taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
return localSegmentTarFile.toURI();
}
- URI outputSegmentDirURI =
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
- try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs,
outputSegmentDirURI)) {
- URI outputSegmentTarURI = URI.create(outputSegmentDirURI +
localSegmentTarFile.getName());
- if
(!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT))
&& outputFileFS.exists(
- outputSegmentTarURI)) {
- LOGGER.warn("Not overwrite existing output segment tar file: {}",
outputFileFS.exists(outputSegmentTarURI));
- } else {
- outputFileFS.copyFromLocalFile(localSegmentTarFile,
outputSegmentTarURI);
- }
- return outputSegmentTarURI;
- }
+ return super.moveSegmentToOutputPinotFS(taskConfigs, localSegmentTarFile);
}
private File tarSegmentDir(SegmentGenerationTaskSpec taskSpec, String
segmentName)
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index c704d5a6b8f..02e15b9d328 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -187,6 +187,7 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
continue;
}
Map<String, String> configs = new
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segment.getSegmentName())));
+ configs.putAll(MinionTaskUtils.getPushTaskConfig(tableNameWithType,
taskConfigs, _clusterInfoAccessor));
configs.put(MinionConstants.DOWNLOAD_URL_KEY,
segment.getDownloadUrl());
configs.put(MinionConstants.UPLOAD_URL_KEY,
_clusterInfoAccessor.getVipUrlForLeadController(tableNameWithType)
+ "/segments");
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
index 1e804d7e6c2..0ad879896f3 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
@@ -350,4 +350,19 @@ public class MinionTaskUtilsTest {
BatchConfigProperties.SegmentPushType.TAR.toString());
assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI),
"http://localhost:9000");
}
+
+ @Test
+ public void testGetPushTaskConfigMETADATAPushModeWithLocalOutputDir() {
+ Map<String, String> taskConfig = new HashMap<>();
+ taskConfig.put(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.METADATA.toString());
+ taskConfig.put(MinionTaskUtils.ALLOW_METADATA_PUSH_WITH_LOCAL_FS, "true");
+ Map<String, String> pushTaskConfigs =
MinionTaskUtils.getPushTaskConfig(_tableConfig.getTableName(), taskConfig,
+ getMockClusterInfo("/data/dir", "http://localhost:9000"));
+
+
assertEquals(pushTaskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI),
"/data/dir/myTable");
+ assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_MODE),
+ BatchConfigProperties.SegmentPushType.METADATA.toString());
+
assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI),
"http://localhost:9000");
+ assertEquals(pushTaskConfigs.size(), 4);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]